exchange_test.rs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. use std::collections::{BTreeMap};
  2. use std::io::{Error};
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use futures::StreamExt;
  6. use rust_decimal_macros::dec;
  7. use tokio::sync::mpsc::{channel, Receiver, Sender};
  8. use tokio::sync::Mutex;
  9. use tokio::try_join;
  10. use tracing::{error, trace};
  11. // use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  12. // use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  13. // use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  14. // use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
  15. // use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  16. // use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
  17. use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  18. use exchanges::response_base::ResponseData;
  19. use standard::exchange::{Exchange, ExchangeEnum};
  20. // use standard::{binance_spot_handle, Order, Platform, utils};
  21. // use standard::{binance_handle, Order, Platform, utils};
  22. // use standard::{kucoin_handle, Order, Platform, utils};
  23. // use standard::{kucoin_spot_handle, Order, Platform, utils};
  24. // use standard::{gate_handle, Order, Platform, utils};
  25. // use standard::{bitget_spot_handle, Order, Platform, utils};
  26. use standard::{okx_handle, Order, Platform, utils};
  27. // 创建实体
  28. #[allow(dead_code)]
  29. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  30. utils::proxy_handle();
  31. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  32. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  33. let account_info = global::account_info::get_account_info("../test_account.toml");
  34. match exchange {
  35. ExchangeEnum::BinanceSwap => {
  36. let mut params: BTreeMap<String, String> = BTreeMap::new();
  37. let access_key = account_info.binance_access_key;
  38. let secret_key = account_info.binance_secret_key;
  39. params.insert("access_key".to_string(), access_key);
  40. params.insert("secret_key".to_string(), secret_key);
  41. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  42. }
  43. ExchangeEnum::BinanceSpot => {
  44. let mut params: BTreeMap<String, String> = BTreeMap::new();
  45. let access_key = account_info.binance_access_key;
  46. let secret_key = account_info.binance_secret_key;
  47. params.insert("access_key".to_string(), access_key);
  48. params.insert("secret_key".to_string(), secret_key);
  49. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  50. }
  51. ExchangeEnum::GateSwap => {
  52. let mut params: BTreeMap<String, String> = BTreeMap::new();
  53. let access_key = account_info.gate_access_key;
  54. let secret_key = account_info.gate_secret_key;
  55. params.insert("access_key".to_string(), access_key);
  56. params.insert("secret_key".to_string(), secret_key);
  57. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  58. }
  59. ExchangeEnum::GateSpot => {
  60. let mut params: BTreeMap<String, String> = BTreeMap::new();
  61. let access_key = account_info.gate_access_key;
  62. let secret_key = account_info.gate_secret_key;
  63. params.insert("access_key".to_string(), access_key);
  64. params.insert("secret_key".to_string(), secret_key);
  65. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  66. }
  67. ExchangeEnum::KucoinSwap => {
  68. let mut params: BTreeMap<String, String> = BTreeMap::new();
  69. let access_key = account_info.kucoin_access_key;
  70. let secret_key = account_info.kucoin_secret_key;
  71. let pass_key = account_info.kucoin_pass;
  72. params.insert("access_key".to_string(), access_key);
  73. params.insert("secret_key".to_string(), secret_key);
  74. params.insert("pass_key".to_string(), pass_key);
  75. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  76. }
  77. ExchangeEnum::KucoinSpot => {
  78. let mut params: BTreeMap<String, String> = BTreeMap::new();
  79. let access_key = account_info.kucoin_access_key;
  80. let secret_key = account_info.kucoin_secret_key;
  81. let pass_key = account_info.kucoin_pass;
  82. params.insert("access_key".to_string(), access_key);
  83. params.insert("secret_key".to_string(), secret_key);
  84. params.insert("pass_key".to_string(), pass_key);
  85. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  86. }
  87. ExchangeEnum::OkxSwap => {
  88. let mut params: BTreeMap<String, String> = BTreeMap::new();
  89. let access_key = account_info.okx_access_key;
  90. let secret_key = account_info.okx_secret_key;
  91. let pass_key = account_info.okx_pass;
  92. params.insert("access_key".to_string(), access_key);
  93. params.insert("secret_key".to_string(), secret_key);
  94. params.insert("pass_key".to_string(), pass_key);
  95. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  96. }
  97. ExchangeEnum::BitgetSpot => {
  98. let mut params: BTreeMap<String, String> = BTreeMap::new();
  99. let access_key = account_info.bitget_access_key;
  100. let secret_key = account_info.bitget_secret_key;
  101. let pass_key = account_info.bitget_pass;
  102. params.insert("access_key".to_string(), access_key);
  103. params.insert("secret_key".to_string(), secret_key);
  104. params.insert("pass_key".to_string(), pass_key);
  105. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  106. }
  107. }
  108. }
  109. #[allow(dead_code)]
  110. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<OkxSwapSubscribeType>: From<T> {
  111. utils::proxy_handle();
  112. let account_info = global::account_info::get_account_info("../test_account.toml");
  113. match exchange {
  114. ExchangeEnum::BinanceSpot => {
  115. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  116. // trace!(symbol_format);
  117. // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
  118. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  119. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  120. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  121. // let bool_v1 = Arc::new(AtomicBool::new(true));
  122. //
  123. // let params = BinanceSpotLogin {
  124. // api_key: account_info.binance_access_key,
  125. // api_secret: account_info.binance_secret_key,
  126. // };
  127. // let mut exchange_wss;
  128. // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
  129. // exchange_wss.set_symbols(vec![symbol_format]);
  130. // exchange_wss.set_subscribe(subscriber_type.into());
  131. //
  132. //
  133. // let mold_arc = Arc::new(mold.to_string());
  134. // //读取
  135. // tokio::spawn(async move {
  136. // let mold_clone = Arc::clone(&mold_arc);
  137. // loop {
  138. // if let Some(data) = read_rx.next().await {
  139. // trace!("原始数据 data:{:?}",data);
  140. // match mold_clone.as_str() {
  141. // "depth" => {
  142. // if data.data != "" {
  143. // let result = binance_spot_handle::handle_special_depth(data);
  144. // trace!(?result)
  145. // }
  146. // }
  147. // "ticker" => {
  148. // if data.data != "" {
  149. // let result = binance_spot_handle::handle_special_ticker(data);
  150. // trace!(?result)
  151. // }
  152. // }
  153. // _ => {
  154. // error!("没有该命令!mode={}", mold_clone);
  155. // panic!("没有该命令!mode={}", mold_clone)
  156. // }
  157. // }
  158. // }
  159. // };
  160. // });
  161. //
  162. // let t1 = tokio::spawn(async move {
  163. // //链接
  164. // let bool_v3_clone = Arc::clone(&bool_v1);
  165. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  166. // });
  167. // try_join!(t1).unwrap();
  168. }
  169. ExchangeEnum::BinanceSwap => {
  170. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  171. // trace!(symbol_format);
  172. // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
  173. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  174. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  175. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  176. // let bool_v1 = Arc::new(AtomicBool::new(true));
  177. //
  178. // let params = BinanceSwapLogin {
  179. // api_key: account_info.binance_access_key,
  180. // api_secret: account_info.binance_secret_key,
  181. // };
  182. // let mut exchange_wss;
  183. // exchange_wss = BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
  184. // exchange_wss.set_symbols(vec![symbol_format]);
  185. // exchange_wss.set_subscribe(subscriber_type.into());
  186. //
  187. //
  188. // let mold_arc = Arc::new(mold.to_string());
  189. // //读取
  190. // tokio::spawn(async move {
  191. // let mold_clone = Arc::clone(&mold_arc);
  192. // loop {
  193. // if let Some(data) = read_rx.next().await {
  194. // trace!("原始数据 data:{:?}",data);
  195. // match mold_clone.as_str() {
  196. // "depth" => {
  197. // if data.data != "" {
  198. // let result = binance_handle::handle_special_depth(data);
  199. // trace!(?result)
  200. // }
  201. // }
  202. // "ticker" => {
  203. // if data.data != "" {
  204. // let result = binance_handle::handle_special_ticker(data);
  205. // trace!(?result)
  206. // }
  207. // }
  208. // _ => {
  209. // error!("没有该命令!mode={}", mold_clone);
  210. // panic!("没有该命令!mode={}", mold_clone)
  211. // }
  212. // }
  213. // }
  214. // };
  215. // });
  216. //
  217. // let t1 = tokio::spawn(async move {
  218. // //链接
  219. // let bool_v3_clone = Arc::clone(&bool_v1);
  220. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  221. // });
  222. // try_join!(t1).unwrap();
  223. }
  224. ExchangeEnum::KucoinSwap => {
  225. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  226. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  227. //
  228. // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
  229. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  230. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  231. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  232. // let bool_v1 = Arc::new(AtomicBool::new(true));
  233. //
  234. // let params = KucoinSwapLogin {
  235. // access_key: account_info.kucoin_access_key,
  236. // secret_key: account_info.kucoin_secret_key,
  237. // pass_key: account_info.kucoin_pass,
  238. // };
  239. // let mut exchange_wss;
  240. // if ["depth", "ticker"].contains(&mold) {
  241. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
  242. // } else {
  243. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
  244. // }
  245. // exchange_wss.set_symbols(vec![symbol_format]);
  246. // exchange_wss.set_subscribe(subscriber_type.into());
  247. //
  248. // let mold_arc = Arc::new(mold.to_string());
  249. // tokio::spawn(async move {
  250. // let mold_clone = Arc::clone(&mold_arc);
  251. // loop {
  252. // if let Some(data) = read_rx.next().await {
  253. // trace!("原始数据 data:{:?}",data);
  254. // match mold_clone.as_str() {
  255. // "depth" => {
  256. // let result = kucoin_handle::handle_special_depth(data);
  257. // trace!(?result)
  258. // }
  259. // "ticker" => {
  260. // let result = kucoin_handle::handle_special_ticker(data);
  261. // trace!(?result)
  262. // }
  263. // "account" => {
  264. // let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
  265. // trace!(?result)
  266. // }
  267. // "position" => {
  268. // let result = kucoin_handle::handle_position(data, dec!(1));
  269. // trace!(?result)
  270. // }
  271. // "orders" => {
  272. // let result = kucoin_handle::handle_order(data, dec!(0.001));
  273. // trace!(?result)
  274. // }
  275. // _ => {
  276. // error!("没有该命令!mode={}", mold_clone);
  277. // panic!("没有该命令!mode={}", mold_clone)
  278. // }
  279. // }
  280. // }
  281. // }
  282. // });
  283. //
  284. // let t1 = tokio::spawn(async move {
  285. // //链接
  286. // let bool_v3_clone = Arc::clone(&bool_v1);
  287. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  288. // });
  289. // try_join!(t1).unwrap();
  290. }
  291. ExchangeEnum::KucoinSpot => {
  292. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  293. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  294. // trace!(symbol_format);
  295. // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
  296. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  297. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  298. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  299. // let bool_v1 = Arc::new(AtomicBool::new(true));
  300. //
  301. // let params = KucoinSpotLogin {
  302. // access_key: account_info.kucoin_access_key,
  303. // secret_key: account_info.kucoin_secret_key,
  304. // pass_key: account_info.kucoin_pass,
  305. // };
  306. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  307. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
  308. // } else {
  309. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
  310. // };
  311. // exchange_wss.set_symbols(vec![symbol_format]);
  312. // exchange_wss.set_subscribe(subscriber_type.into());
  313. //
  314. // let mold_arc = Arc::new(mold.to_string());
  315. // tokio::spawn(async move {
  316. // let mold_clone = Arc::clone(&mold_arc);
  317. // loop {
  318. // if let Some(data) = read_rx.next().await {
  319. // trace!("原始数据 data:{:?}",data);
  320. // match mold_clone.as_str() {
  321. // "depth" => {
  322. // if data.data != "" {
  323. // let result = kucoin_spot_handle::handle_special_depth(data);
  324. // trace!(?result)
  325. // }
  326. // }
  327. // "ticker" => {
  328. // if data.data != "" {
  329. // let result = kucoin_spot_handle::handle_special_ticker(data);
  330. // trace!(?result)
  331. // }
  332. // }
  333. // "account" => {
  334. // if data.data != "" {
  335. // let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
  336. // trace!(?result)
  337. // }
  338. // }
  339. // "orders" => {
  340. // if data.data != "" {
  341. // let result = kucoin_spot_handle::handle_order(data, dec!(1));
  342. // trace!(?result)
  343. // }
  344. // }
  345. // _ => {
  346. // error!("没有该命令!mode={}", mold_clone);
  347. // panic!("没有该命令!mode={}", mold_clone)
  348. // }
  349. // }
  350. // }
  351. // }
  352. // });
  353. // let t1 = tokio::spawn(async move {
  354. // //链接
  355. // let bool_v3_clone = Arc::clone(&bool_v1);
  356. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  357. // });
  358. // try_join!(t1).unwrap();
  359. }
  360. ExchangeEnum::GateSwap => {
  361. // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
  362. // trace!(symbol_format);
  363. // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
  364. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  365. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  366. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  367. // let bool_v1 = Arc::new(AtomicBool::new(true));
  368. //
  369. // let params = GateSwapLogin {
  370. // api_key: account_info.gate_access_key,
  371. // secret: account_info.gate_secret_key,
  372. // };
  373. // let mut exchange_wss = GateSwapWs::new_label(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  374. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  375. // exchange_wss.set_subscribe(subscriber_type.into());
  376. //
  377. // let mold_arc = Arc::new(mold.to_string());
  378. // tokio::spawn(async move {
  379. // let mold_clone = Arc::clone(&mold_arc);
  380. // loop {
  381. // if let Some(data) = read_rx.next().await {
  382. // trace!("原始数据 data:{:?}",data);
  383. // match mold_clone.as_str() {
  384. // "depth" => {
  385. // if data.data != "" {
  386. // let result = gate_handle::handle_special_depth(data);
  387. // trace!(?result)
  388. // }
  389. // }
  390. // "ticker" => {
  391. // if data.data != "" {
  392. // let result = gate_handle::handle_special_ticker(data);
  393. // trace!(?result)
  394. // }
  395. // }
  396. // "account" => {
  397. // if data.data != "" {
  398. // let result = gate_handle::handle_account_info(data, symbol_format.clone());
  399. // trace!(?result)
  400. // }
  401. // }
  402. // "orders" => {
  403. // if data.data != "" {
  404. // let result = gate_handle::handle_order(data, dec!(1));
  405. // trace!(?result)
  406. // }
  407. // }
  408. // _ => {
  409. // error!("没有该命令!mode={}", mold_clone);
  410. // panic!("没有该命令!mode={}", mold_clone)
  411. // }
  412. // }
  413. // }
  414. // }
  415. // });
  416. // let t1 = tokio::spawn(async move {
  417. // //链接
  418. // let bool_v3_clone = Arc::clone(&bool_v1);
  419. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  420. // });
  421. // try_join!(t1).unwrap();
  422. }
  423. ExchangeEnum::BitgetSpot => {
  424. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  425. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  426. // trace!(symbol_format);
  427. // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
  428. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  429. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  430. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  431. // let bool_v1 = Arc::new(AtomicBool::new(true));
  432. //
  433. // let params = BitgetSpotLogin {
  434. // api_key: account_info.bitget_access_key,
  435. // secret_key: account_info.bitget_secret_key,
  436. // passphrase_key: account_info.bitget_pass,
  437. // };
  438. //
  439. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  440. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
  441. // } else {
  442. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
  443. // };
  444. // exchange_wss.set_symbols(vec![symbol_format]);
  445. // exchange_wss.set_subscribe(subscriber_type.into());
  446. //
  447. // let mold_arc = Arc::new(mold.to_string());
  448. // //读取
  449. // tokio::spawn(async move {
  450. // loop {
  451. // let mold_clone = Arc::clone(&mold_arc);
  452. // if let Some(data) = read_rx.next().await {
  453. // trace!("原始数据 data:{:?}",data);
  454. // match mold_clone.as_str() {
  455. // "depth" => {
  456. // if data.data != "" {
  457. // let result = bitget_spot_handle::handle_special_depth(data);
  458. // trace!(?result)
  459. // }
  460. // }
  461. // "ticker" => {
  462. // if data.data != "" {
  463. // let result = bitget_spot_handle::handle_special_ticker(data);
  464. // trace!(?result)
  465. // }
  466. // }
  467. // "account" => {
  468. // if data.data != "" {
  469. // let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
  470. // trace!(?result)
  471. // }
  472. // }
  473. // "orders" => {
  474. // if data.data != "" {
  475. // let result = bitget_spot_handle::handle_order(data, dec!(1));
  476. // trace!(?result)
  477. // }
  478. // }
  479. // _ => {
  480. // error!("没有该命令!mode={}", mold_clone);
  481. // panic!("没有该命令!mode={}", mold_clone)
  482. // }
  483. // }
  484. // }
  485. // }
  486. // });
  487. // let t1 = tokio::spawn(async move {
  488. // //链接
  489. // let bool_v3_clone = Arc::clone(&bool_v1);
  490. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  491. // });
  492. // try_join!(t1).unwrap();
  493. }
  494. ExchangeEnum::OkxSwap => {
  495. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  496. trace!(symbol_format);
  497. let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
  498. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  499. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  500. let write_tx_am = Arc::new(Mutex::new(write_tx));
  501. let bool_v1 = Arc::new(AtomicBool::new(true));
  502. let params = OkxSwapLogin {
  503. api_key: account_info.okx_access_key,
  504. secret_key: account_info.okx_secret_key,
  505. passphrase: account_info.okx_pass,
  506. };
  507. let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  508. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Public)
  509. } else if ["account", "orders", "position"].contains(&mold) {
  510. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Private)
  511. } else {
  512. OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Business)
  513. };
  514. exchange_wss.set_symbols(vec![symbol_format.clone()]);
  515. exchange_wss.set_subscribe(subscriber_type.into());
  516. let mold_arc = Arc::new(mold.to_string());
  517. tokio::spawn(async move {
  518. let mold_clone = Arc::clone(&mold_arc);
  519. loop {
  520. if let Some(data) = read_rx.next().await {
  521. trace!("原始数据 data:{:?}",data);
  522. match mold_clone.as_str() {
  523. "depth" => {
  524. if data.data != "" {
  525. let result = okx_handle::handle_special_depth(data);
  526. trace!(?result)
  527. }
  528. }
  529. "ticker" => {
  530. if data.data != "" {
  531. let result = okx_handle::handle_special_ticker(data);
  532. trace!(?result)
  533. }
  534. }
  535. "account" => {
  536. if data.data != "" {
  537. let result = okx_handle::handle_account_info(data, symbol_format.clone());
  538. trace!(?result)
  539. }
  540. }
  541. "position" => {
  542. if data.data != "" {
  543. let result = okx_handle::handle_position(data, dec!(10));
  544. trace!(?result)
  545. }
  546. }
  547. "orders" => {
  548. if data.data != "" {
  549. let result = okx_handle::handle_order(data, dec!(10));
  550. trace!(?result)
  551. }
  552. }
  553. _ => {
  554. error!("没有该命令!mode={}", mold_clone);
  555. panic!("没有该命令!mode={}", mold_clone)
  556. }
  557. }
  558. }
  559. }
  560. });
  561. let t1 = tokio::spawn(async move {
  562. //链接
  563. let bool_v3_clone = Arc::clone(&bool_v1);
  564. exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  565. });
  566. try_join!(t1).unwrap();
  567. }
  568. _ => {
  569. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  570. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  571. }
  572. }
  573. }