exchange_test.rs 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. use std::collections::{BTreeMap};
  2. use std::io::{Error};
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use rust_decimal_macros::dec;
  6. use tokio::sync::mpsc::{channel, Receiver, Sender};
  7. use tokio::sync::Mutex;
  8. use tracing::{error, trace};
  9. // use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  10. // use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  11. // use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  12. // use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
  13. // use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  14. // use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
  15. // use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  16. use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
  17. use exchanges::response_base::ResponseData;
  18. use standard::exchange::{Exchange, ExchangeEnum};
  19. // use standard::{binance_spot_handle, Order, Platform, utils};
  20. // use standard::{binance_handle, Order, Platform, utils};
  21. // use standard::{kucoin_handle, Order, Platform, utils};
  22. // use standard::{kucoin_spot_handle, Order, Platform, utils};
  23. // use standard::{gate_swap_handle, handle_info, Order, Platform, utils};
  24. // use standard::{bitget_spot_handle, Order, Platform, utils};
  25. // use standard::{okx_handle, Order, Platform, utils};
  26. use standard::{htx_swap_handle, handle_info, Order, Platform, utils};
  27. // 创建实体
  28. #[allow(dead_code)]
  29. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  30. utils::proxy_handle();
  31. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  32. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  33. let account_info = global::account_info::get_account_info("../test_account.toml");
  34. match exchange {
  35. ExchangeEnum::BinanceSwap => {
  36. let mut params: BTreeMap<String, String> = BTreeMap::new();
  37. let access_key = account_info.binance_access_key;
  38. let secret_key = account_info.binance_secret_key;
  39. params.insert("access_key".to_string(), access_key);
  40. params.insert("secret_key".to_string(), secret_key);
  41. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  42. }
  43. // ExchangeEnum::BinanceSpot => {
  44. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  45. // let access_key = account_info.binance_access_key;
  46. // let secret_key = account_info.binance_secret_key;
  47. // params.insert("access_key".to_string(), access_key);
  48. // params.insert("secret_key".to_string(), secret_key);
  49. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  50. // }
  51. ExchangeEnum::GateSwap => {
  52. let mut params: BTreeMap<String, String> = BTreeMap::new();
  53. let access_key = account_info.gate_access_key;
  54. let secret_key = account_info.gate_secret_key;
  55. params.insert("access_key".to_string(), access_key);
  56. params.insert("secret_key".to_string(), secret_key);
  57. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  58. }
  59. // ExchangeEnum::GateSpot => {
  60. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  61. // let access_key = account_info.gate_access_key;
  62. // let secret_key = account_info.gate_secret_key;
  63. // params.insert("access_key".to_string(), access_key);
  64. // params.insert("secret_key".to_string(), secret_key);
  65. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  66. // }
  67. ExchangeEnum::KucoinSwap => {
  68. let mut params: BTreeMap<String, String> = BTreeMap::new();
  69. let access_key = account_info.kucoin_access_key;
  70. let secret_key = account_info.kucoin_secret_key;
  71. let pass_key = account_info.kucoin_pass;
  72. params.insert("access_key".to_string(), access_key);
  73. params.insert("secret_key".to_string(), secret_key);
  74. params.insert("pass_key".to_string(), pass_key);
  75. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  76. }
  77. // ExchangeEnum::KucoinSpot => {
  78. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  79. // let access_key = account_info.kucoin_access_key;
  80. // let secret_key = account_info.kucoin_secret_key;
  81. // let pass_key = account_info.kucoin_pass;
  82. // params.insert("access_key".to_string(), access_key);
  83. // params.insert("secret_key".to_string(), secret_key);
  84. // params.insert("pass_key".to_string(), pass_key);
  85. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  86. // }
  87. // ExchangeEnum::OkxSwap => {
  88. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  89. // let access_key = account_info.okx_access_key;
  90. // let secret_key = account_info.okx_secret_key;
  91. // let pass_key = account_info.okx_pass;
  92. // params.insert("access_key".to_string(), access_key);
  93. // params.insert("secret_key".to_string(), secret_key);
  94. // params.insert("pass_key".to_string(), pass_key);
  95. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  96. // }
  97. // ExchangeEnum::BitgetSpot => {
  98. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  99. // let access_key = account_info.bitget_access_key;
  100. // let secret_key = account_info.bitget_secret_key;
  101. // let pass_key = account_info.bitget_pass;
  102. // params.insert("access_key".to_string(), access_key);
  103. // params.insert("secret_key".to_string(), secret_key);
  104. // params.insert("pass_key".to_string(), pass_key);
  105. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  106. // }
  107. ExchangeEnum::BitgetSwap => {
  108. let mut params: BTreeMap<String, String> = BTreeMap::new();
  109. let access_key = account_info.bitget_access_key;
  110. let secret_key = account_info.bitget_secret_key;
  111. let pass_key = account_info.bitget_pass;
  112. params.insert("access_key".to_string(), access_key);
  113. params.insert("secret_key".to_string(), secret_key);
  114. params.insert("pass_key".to_string(), pass_key);
  115. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  116. }
  117. ExchangeEnum::BybitSwap => {
  118. let mut params: BTreeMap<String, String> = BTreeMap::new();
  119. let access_key = account_info.bybit_access_key;
  120. let secret_key = account_info.bybit_secret_key;
  121. let pass_key = account_info.bybit_pass;
  122. params.insert("access_key".to_string(), access_key);
  123. params.insert("secret_key".to_string(), secret_key);
  124. params.insert("pass_key".to_string(), pass_key);
  125. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  126. }
  127. ExchangeEnum::HtxSwap => {
  128. let mut params: BTreeMap<String, String> = BTreeMap::new();
  129. let access_key = account_info.htx_access_key;
  130. let secret_key = account_info.htx_secret_key;
  131. let pass_key = account_info.htx_pass;
  132. params.insert("access_key".to_string(), access_key);
  133. params.insert("secret_key".to_string(), secret_key);
  134. params.insert("pass_key".to_string(), pass_key);
  135. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  136. }
  137. _ => {
  138. panic!("该交易所未实现!")
  139. }
  140. }
  141. }
  142. #[allow(dead_code)]
  143. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<HtxSwapSubscribeType>: From<T> {
  144. utils::proxy_handle();
  145. let account_info = global::account_info::get_account_info("../test_account.toml");
  146. match exchange {
  147. // ExchangeEnum::BinanceSpot => {
  148. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  149. // trace!(symbol_format);
  150. // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
  151. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  152. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  153. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  154. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  155. //
  156. // let params = BinanceSpotLogin {
  157. // api_key: account_info.binance_access_key,
  158. // api_secret: account_info.binance_secret_key,
  159. // };
  160. // let mut exchange_wss;
  161. // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
  162. // exchange_wss.set_symbols(vec![symbol_format]);
  163. // exchange_wss.set_subscribe(subscriber_type.into());
  164. //
  165. //
  166. // let mold_arc = Arc::new(mold.to_string());
  167. // //读取
  168. // tokio::spawn(async move {
  169. // let mold_clone = Arc::clone(&mold_arc);
  170. // loop {
  171. // if let Some(data) = read_rx.next().await {
  172. // trace!("原始数据 data:{:?}",data);
  173. // match mold_clone.as_str() {
  174. // "depth" => {
  175. // if data.data != "" {
  176. // let result = binance_spot_handle::handle_special_depth(data);
  177. // trace!(?result)
  178. // }
  179. // }
  180. // "ticker" => {
  181. // if data.data != "" {
  182. // let result = binance_spot_handle::handle_special_ticker(data);
  183. // trace!(?result)
  184. // }
  185. // }
  186. // _ => {
  187. // error!("没有该命令!mode={}", mold_clone);
  188. // panic!("没有该命令!mode={}", mold_clone)
  189. // }
  190. // }
  191. // }
  192. // };
  193. // });
  194. //
  195. // let t1 = tokio::spawn(async move {
  196. // //链接
  197. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  198. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  199. // });
  200. // try_join!(t1).unwrap();
  201. // }
  202. ExchangeEnum::BinanceSwap => {
  203. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  204. // trace!(symbol_format);
  205. // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
  206. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  207. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  208. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  209. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  210. //
  211. // let params = BinanceSwapLogin {
  212. // api_key: account_info.binance_access_key,
  213. // api_secret: account_info.binance_secret_key,
  214. // };
  215. // let mut exchange_wss;
  216. // exchange_wss = BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
  217. // exchange_wss.set_symbols(vec![symbol_format]);
  218. // exchange_wss.set_subscribe(subscriber_type.into());
  219. //
  220. //
  221. // let mold_arc = Arc::new(mold.to_string());
  222. // //读取
  223. // tokio::spawn(async move {
  224. // let mold_clone = Arc::clone(&mold_arc);
  225. // loop {
  226. // if let Some(data) = read_rx.next().await {
  227. // trace!("原始数据 data:{:?}",data);
  228. // match mold_clone.as_str() {
  229. // "depth" => {
  230. // if data.data != "" {
  231. // let result = binance_handle::handle_special_depth(data);
  232. // trace!(?result)
  233. // }
  234. // }
  235. // "ticker" => {
  236. // if data.data != "" {
  237. // let result = binance_handle::handle_special_ticker(data);
  238. // trace!(?result)
  239. // }
  240. // }
  241. // _ => {
  242. // error!("没有该命令!mode={}", mold_clone);
  243. // panic!("没有该命令!mode={}", mold_clone)
  244. // }
  245. // }
  246. // }
  247. // };
  248. // });
  249. //
  250. // let t1 = tokio::spawn(async move {
  251. // //链接
  252. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  253. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  254. // });
  255. // try_join!(t1).unwrap();
  256. }
  257. ExchangeEnum::KucoinSwap => {
  258. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  259. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  260. //
  261. // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
  262. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  263. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  264. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  265. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  266. //
  267. // let params = KucoinSwapLogin {
  268. // access_key: account_info.kucoin_access_key,
  269. // secret_key: account_info.kucoin_secret_key,
  270. // pass_key: account_info.kucoin_pass,
  271. // };
  272. // let mut exchange_wss;
  273. // if ["depth", "ticker"].contains(&mold) {
  274. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
  275. // } else {
  276. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
  277. // }
  278. // exchange_wss.set_symbols(vec![symbol_format]);
  279. // exchange_wss.set_subscribe(subscriber_type.into());
  280. //
  281. // let mold_arc = Arc::new(mold.to_string());
  282. // tokio::spawn(async move {
  283. // let mold_clone = Arc::clone(&mold_arc);
  284. // loop {
  285. // if let Some(data) = read_rx.next().await {
  286. // trace!("原始数据 data:{:?}",data);
  287. // match mold_clone.as_str() {
  288. // "depth" => {
  289. // let result = kucoin_handle::handle_special_depth(data);
  290. // trace!(?result)
  291. // }
  292. // "ticker" => {
  293. // let result = kucoin_handle::handle_special_ticker(data);
  294. // trace!(?result)
  295. // }
  296. // "account" => {
  297. // let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
  298. // trace!(?result)
  299. // }
  300. // "position" => {
  301. // let result = kucoin_handle::handle_position(data, dec!(1));
  302. // trace!(?result)
  303. // }
  304. // "orders" => {
  305. // let result = kucoin_handle::handle_order(data, dec!(0.001));
  306. // trace!(?result)
  307. // }
  308. // _ => {
  309. // error!("没有该命令!mode={}", mold_clone);
  310. // panic!("没有该命令!mode={}", mold_clone)
  311. // }
  312. // }
  313. // }
  314. // }
  315. // });
  316. //
  317. // let t1 = tokio::spawn(async move {
  318. // //链接
  319. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  320. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  321. // });
  322. // try_join!(t1).unwrap();
  323. }
  324. // ExchangeEnum::KucoinSpot => {
  325. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  326. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  327. // trace!(symbol_format);
  328. // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
  329. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  330. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  331. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  332. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  333. //
  334. // let params = KucoinSpotLogin {
  335. // access_key: account_info.kucoin_access_key,
  336. // secret_key: account_info.kucoin_secret_key,
  337. // pass_key: account_info.kucoin_pass,
  338. // };
  339. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  340. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
  341. // } else {
  342. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
  343. // };
  344. // exchange_wss.set_symbols(vec![symbol_format]);
  345. // exchange_wss.set_subscribe(subscriber_type.into());
  346. //
  347. // let mold_arc = Arc::new(mold.to_string());
  348. // tokio::spawn(async move {
  349. // let mold_clone = Arc::clone(&mold_arc);
  350. // loop {
  351. // if let Some(data) = read_rx.next().await {
  352. // trace!("原始数据 data:{:?}",data);
  353. // match mold_clone.as_str() {
  354. // "depth" => {
  355. // if data.data != "" {
  356. // let result = kucoin_spot_handle::handle_special_depth(data);
  357. // trace!(?result)
  358. // }
  359. // }
  360. // "ticker" => {
  361. // if data.data != "" {
  362. // let result = kucoin_spot_handle::handle_special_ticker(data);
  363. // trace!(?result)
  364. // }
  365. // }
  366. // "account" => {
  367. // if data.data != "" {
  368. // let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
  369. // trace!(?result)
  370. // }
  371. // }
  372. // "orders" => {
  373. // if data.data != "" {
  374. // let result = kucoin_spot_handle::handle_order(data, dec!(1));
  375. // trace!(?result)
  376. // }
  377. // }
  378. // _ => {
  379. // error!("没有该命令!mode={}", mold_clone);
  380. // panic!("没有该命令!mode={}", mold_clone)
  381. // }
  382. // }
  383. // }
  384. // }
  385. // });
  386. // let t1 = tokio::spawn(async move {
  387. // //链接
  388. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  389. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  390. // });
  391. // try_join!(t1).unwrap();
  392. // }
  393. ExchangeEnum::GateSwap => {
  394. // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
  395. // trace!(symbol_format);
  396. // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
  397. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  398. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  399. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  400. //
  401. // let params = GateSwapLogin {
  402. // api_key: account_info.gate_access_key,
  403. // secret: account_info.gate_secret_key,
  404. // };
  405. // let mut exchange_wss = GateSwapWs::new_label(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  406. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  407. // exchange_wss.set_subscribe(subscriber_type.into());
  408. //
  409. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  410. // let mold_clone = mold.to_string().clone();
  411. // let fun = move |data: ResponseData| {
  412. // let symbol_format_c = symbol_format.clone();
  413. // let mold_cc = mold_clone.clone();
  414. //
  415. // async move {
  416. // trace!("原始数据 data:{:?}",data);
  417. // match mold_cc.as_str() {
  418. // "depth" => {
  419. // if data.data != "" {
  420. // let result = handle_info::format_depth(ExchangeEnum::GateSwap, &data);
  421. // trace!(?result)
  422. // }
  423. // }
  424. // "ticker" => {
  425. // if data.data != "" {
  426. // let result = gate_swap_handle::handle_book_ticker(&data);
  427. // trace!(?result)
  428. // }
  429. // }
  430. // "account" => {
  431. // if data.data != "" {
  432. // let result = gate_swap_handle::handle_account_info(&data, &symbol_format_c);
  433. // trace!(?result)
  434. // }
  435. // }
  436. // "orders" => {
  437. // if data.data != "" {
  438. // let result = gate_swap_handle::handle_order(data, dec!(1));
  439. // trace!(?result)
  440. // }
  441. // }
  442. // _ => {
  443. // error!("没有该命令!mode={}", mold_cc);
  444. // panic!("没有该命令!mode={}", mold_cc)
  445. // }
  446. // };
  447. // }
  448. // };
  449. // exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  450. }
  451. // ExchangeEnum::BitgetSpot => {
  452. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  453. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  454. // trace!(symbol_format);
  455. // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
  456. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  457. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  458. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  459. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  460. //
  461. // let params = BitgetSpotLogin {
  462. // api_key: account_info.bitget_access_key,
  463. // secret_key: account_info.bitget_secret_key,
  464. // passphrase_key: account_info.bitget_pass,
  465. // };
  466. //
  467. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  468. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
  469. // } else {
  470. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
  471. // };
  472. // exchange_wss.set_symbols(vec![symbol_format]);
  473. // exchange_wss.set_subscribe(subscriber_type.into());
  474. //
  475. // let mold_arc = Arc::new(mold.to_string());
  476. // //读取
  477. // tokio::spawn(async move {
  478. // loop {
  479. // let mold_clone = Arc::clone(&mold_arc);
  480. // if let Some(data) = read_rx.next().await {
  481. // trace!("原始数据 data:{:?}",data);
  482. // match mold_clone.as_str() {
  483. // "depth" => {
  484. // if data.data != "" {
  485. // let result = bitget_spot_handle::handle_special_depth(data);
  486. // trace!(?result)
  487. // }
  488. // }
  489. // "ticker" => {
  490. // if data.data != "" {
  491. // let result = bitget_spot_handle::handle_special_ticker(data);
  492. // trace!(?result)
  493. // }
  494. // }
  495. // "account" => {
  496. // if data.data != "" {
  497. // let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
  498. // trace!(?result)
  499. // }
  500. // }
  501. // "orders" => {
  502. // if data.data != "" {
  503. // let result = bitget_spot_handle::handle_order(data, dec!(1));
  504. // trace!(?result)
  505. // }
  506. // }
  507. // _ => {
  508. // error!("没有该命令!mode={}", mold_clone);
  509. // panic!("没有该命令!mode={}", mold_clone)
  510. // }
  511. // }
  512. // }
  513. // }
  514. // });
  515. // let t1 = tokio::spawn(async move {
  516. // //链接
  517. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  518. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  519. // });
  520. // try_join!(t1).unwrap();
  521. // }
  522. // ExchangeEnum::OkxSwap => {
  523. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  524. // trace!(symbol_format);
  525. // let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
  526. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  527. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  528. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  529. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  530. //
  531. // let params = OkxSwapLogin {
  532. // api_key: account_info.okx_access_key,
  533. // secret_key: account_info.okx_secret_key,
  534. // passphrase: account_info.okx_pass,
  535. // };
  536. //
  537. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  538. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Public)
  539. // } else if ["account", "orders", "position"].contains(&mold) {
  540. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Private)
  541. // } else {
  542. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Business)
  543. // };
  544. //
  545. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  546. // exchange_wss.set_subscribe(subscriber_type.into());
  547. //
  548. // let mold_arc = Arc::new(mold.to_string());
  549. // tokio::spawn(async move {
  550. // let mold_clone = Arc::clone(&mold_arc);
  551. // loop {
  552. // if let Some(data) = read_rx.next().await {
  553. // trace!("原始数据 data:{:?}",data);
  554. // match mold_clone.as_str() {
  555. // "depth" => {
  556. // if data.data != "" {
  557. // let result = okx_handle::handle_special_depth(data);
  558. // trace!(?result)
  559. // }
  560. // }
  561. // "ticker" => {
  562. // if data.data != "" {
  563. // let result = okx_handle::handle_special_ticker(data);
  564. // trace!(?result)
  565. // }
  566. // }
  567. // "account" => {
  568. // if data.data != "" {
  569. // let result = okx_handle::handle_account_info(data, symbol_format.clone());
  570. // trace!(?result)
  571. // }
  572. // }
  573. // "position" => {
  574. // if data.data != "" {
  575. // let result = okx_handle::handle_position(data, dec!(10));
  576. // trace!(?result)
  577. // }
  578. // }
  579. // "orders" => {
  580. // if data.data != "" {
  581. // let result = okx_handle::handle_order(data, dec!(10));
  582. // trace!(?result)
  583. // }
  584. // }
  585. // _ => {
  586. // error!("没有该命令!mode={}", mold_clone);
  587. // panic!("没有该命令!mode={}", mold_clone)
  588. // }
  589. // }
  590. // }
  591. // }
  592. // });
  593. //
  594. // let t1 = tokio::spawn(async move {
  595. // //链接
  596. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  597. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  598. // });
  599. // try_join!(t1).unwrap();
  600. // }
  601. ExchangeEnum::HtxSwap => {
  602. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  603. trace!(symbol_format);
  604. let name = format!("htx_swap@{}", symbol.to_string().to_lowercase());
  605. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  606. let write_tx_am = Arc::new(Mutex::new(write_tx));
  607. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  608. let params = HtxSwapLogin {
  609. api_key: account_info.htx_access_key,
  610. secret: account_info.htx_secret_key,
  611. };
  612. let htx_wss_type = match mold.to_string().clone().as_str() {
  613. "depth" => HtxSwapWsType::Public,
  614. _ => HtxSwapWsType::Private
  615. };
  616. let mut exchange_wss = HtxSwapWs::new_label(name, Option::from(params), htx_wss_type);
  617. exchange_wss.set_symbols(vec![symbol_format.clone()]);
  618. exchange_wss.set_subscribe(subscriber_type.into());
  619. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  620. let mold_clone = mold.to_string().clone();
  621. let fun = move |data: ResponseData| {
  622. let symbol_format_c = symbol_format.clone();
  623. let mold_cc = mold_clone.clone();
  624. async move {
  625. trace!("原始数据 data:{:?}",data);
  626. match mold_cc.as_str() {
  627. "depth" => {
  628. if data.data != "" {
  629. let result = handle_info::format_depth(ExchangeEnum::HtxSwap, &data);
  630. trace!("-------------------------------");
  631. trace!(?result)
  632. }
  633. }
  634. "position" => {
  635. if data.data != "" {
  636. let result = htx_swap_handle::handle_position(&data, &dec!(10));
  637. trace!("-------------------------------");
  638. trace!(?result)
  639. }
  640. }
  641. "account" => {
  642. if data.data != "" {
  643. let result = htx_swap_handle::handle_account_info(&data, &symbol_format_c);
  644. trace!("-------------------------------");
  645. trace!(?result)
  646. }
  647. }
  648. "orders" => {
  649. println!("{:?}", data);
  650. if data.data != "" {
  651. let result = htx_swap_handle::handle_order(data, dec!(10));
  652. trace!("-------------------------------");
  653. trace!(?result)
  654. }
  655. }
  656. _ => {
  657. error!("没有该命令!mode={}", mold_cc);
  658. panic!("没有该命令!mode={}", mold_cc)
  659. }
  660. };
  661. }
  662. };
  663. exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  664. }
  665. _ => {
  666. error!("该交易所不支持!test_new_exchange_wss:{:?}", exchange);
  667. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  668. }
  669. }
  670. }