exchange_test.rs 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. use std::collections::{BTreeMap};
  2. use std::io::{Error};
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use rust_decimal::Decimal;
  6. use tokio::sync::mpsc::{channel, Receiver, Sender};
  7. use tokio::sync::Mutex;
  8. use tracing::{error, trace};
  9. // use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  10. use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  11. // use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  12. // use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
  13. // use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  14. // use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
  15. // use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  16. // use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
  17. use exchanges::response_base::ResponseData;
  18. use standard::exchange::{Exchange, ExchangeEnum};
  19. // use standard::{binance_spot_handle, Order, Platform, utils};
  20. use standard::{Order, Platform, utils};
  21. use standard::exchange_struct_handler::ExchangeStructHandler;
  22. // use standard::{kucoin_handle, Order, Platform, utils};
  23. // use standard::{kucoin_spot_handle, Order, Platform, utils};
  24. // use standard::{gate_swap_handle, handle_info, Order, Platform, utils};
  25. // use standard::{bitget_spot_handle, Order, Platform, utils};
  26. // use standard::{okx_handle, Order, Platform, utils};
  27. // use standard::{htx_swap_handle, handle_info, Order, Platform, utils};
  28. // 创建实体
  29. #[allow(dead_code)]
  30. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  31. utils::proxy_handle(None);
  32. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  33. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  34. let account_info = global::account_info::get_account_info("../test_account.toml");
  35. match exchange {
  36. ExchangeEnum::BinanceSwap => {
  37. let mut params: BTreeMap<String, String> = BTreeMap::new();
  38. let access_key = account_info.binance_access_key;
  39. let secret_key = account_info.binance_secret_key;
  40. params.insert("access_key".to_string(), access_key);
  41. params.insert("secret_key".to_string(), secret_key);
  42. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  43. }
  44. // ExchangeEnum::BinanceSpot => {
  45. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  46. // let access_key = account_info.binance_access_key;
  47. // let secret_key = account_info.binance_secret_key;
  48. // params.insert("access_key".to_string(), access_key);
  49. // params.insert("secret_key".to_string(), secret_key);
  50. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  51. // }
  52. ExchangeEnum::GateSwap => {
  53. let mut params: BTreeMap<String, String> = BTreeMap::new();
  54. let access_key = account_info.gate_access_key;
  55. let secret_key = account_info.gate_secret_key;
  56. params.insert("access_key".to_string(), access_key);
  57. params.insert("secret_key".to_string(), secret_key);
  58. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  59. }
  60. // ExchangeEnum::GateSpot => {
  61. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  62. // let access_key = account_info.gate_access_key;
  63. // let secret_key = account_info.gate_secret_key;
  64. // params.insert("access_key".to_string(), access_key);
  65. // params.insert("secret_key".to_string(), secret_key);
  66. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  67. // }
  68. // ExchangeEnum::KucoinSwap => {
  69. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  70. // let access_key = account_info.kucoin_access_key;
  71. // let secret_key = account_info.kucoin_secret_key;
  72. // let pass_key = account_info.kucoin_pass;
  73. // params.insert("access_key".to_string(), access_key);
  74. // params.insert("secret_key".to_string(), secret_key);
  75. // params.insert("pass_key".to_string(), pass_key);
  76. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  77. // }
  78. // ExchangeEnum::KucoinSpot => {
  79. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  80. // let access_key = account_info.kucoin_access_key;
  81. // let secret_key = account_info.kucoin_secret_key;
  82. // let pass_key = account_info.kucoin_pass;
  83. // params.insert("access_key".to_string(), access_key);
  84. // params.insert("secret_key".to_string(), secret_key);
  85. // params.insert("pass_key".to_string(), pass_key);
  86. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  87. // }
  88. // ExchangeEnum::OkxSwap => {
  89. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  90. // let access_key = account_info.okx_access_key;
  91. // let secret_key = account_info.okx_secret_key;
  92. // let pass_key = account_info.okx_pass;
  93. // params.insert("access_key".to_string(), access_key);
  94. // params.insert("secret_key".to_string(), secret_key);
  95. // params.insert("pass_key".to_string(), pass_key);
  96. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  97. // }
  98. // ExchangeEnum::BitgetSpot => {
  99. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  100. // let access_key = account_info.bitget_access_key;
  101. // let secret_key = account_info.bitget_secret_key;
  102. // let pass_key = account_info.bitget_pass;
  103. // params.insert("access_key".to_string(), access_key);
  104. // params.insert("secret_key".to_string(), secret_key);
  105. // params.insert("pass_key".to_string(), pass_key);
  106. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  107. // }
  108. ExchangeEnum::BitgetSwap => {
  109. let mut params: BTreeMap<String, String> = BTreeMap::new();
  110. let access_key = account_info.bitget_access_key;
  111. let secret_key = account_info.bitget_secret_key;
  112. let pass_key = account_info.bitget_pass;
  113. params.insert("access_key".to_string(), access_key);
  114. params.insert("secret_key".to_string(), secret_key);
  115. params.insert("pass_key".to_string(), pass_key);
  116. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  117. }
  118. ExchangeEnum::BybitSwap => {
  119. let mut params: BTreeMap<String, String> = BTreeMap::new();
  120. let access_key = account_info.bybit_access_key;
  121. let secret_key = account_info.bybit_secret_key;
  122. params.insert("access_key".to_string(), access_key);
  123. params.insert("secret_key".to_string(), secret_key);
  124. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  125. }
  126. // ExchangeEnum::HtxSwap => {
  127. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  128. // let access_key = account_info.htx_access_key;
  129. // let secret_key = account_info.htx_secret_key;
  130. // let pass_key = account_info.htx_pass;
  131. // params.insert("access_key".to_string(), access_key);
  132. // params.insert("secret_key".to_string(), secret_key);
  133. // params.insert("pass_key".to_string(), pass_key);
  134. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  135. // }
  136. _ => {
  137. panic!("该交易所未实现!")
  138. }
  139. }
  140. }
  141. #[allow(dead_code)]
  142. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str)
  143. where
  144. Vec<BinanceSwapSubscribeType>: From<T>,
  145. {
  146. utils::proxy_handle(None);
  147. let account_info = global::account_info::get_account_info("../test_account.toml");
  148. match exchange {
  149. // ExchangeEnum::BinanceSpot => {
  150. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  151. // trace!(symbol_format);
  152. // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
  153. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  154. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  155. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  156. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  157. //
  158. // let params = BinanceSpotLogin {
  159. // api_key: account_info.binance_access_key,
  160. // api_secret: account_info.binance_secret_key,
  161. // };
  162. // let mut exchange_wss;
  163. // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
  164. // exchange_wss.set_symbols(vec![symbol_format]);
  165. // exchange_wss.set_subscribe(subscriber_type.into());
  166. //
  167. //
  168. // let mold_arc = Arc::new(mold.to_string());
  169. // //读取
  170. // tokio::spawn(async move {
  171. // let mold_clone = Arc::clone(&mold_arc);
  172. // loop {
  173. // if let Some(data) = read_rx.next().await {
  174. // trace!("原始数据 data:{:?}",data);
  175. // match mold_clone.as_str() {
  176. // "depth" => {
  177. // if data.data != "" {
  178. // let result = binance_spot_handle::handle_special_depth(data);
  179. // trace!(?result)
  180. // }
  181. // }
  182. // "ticker" => {
  183. // if data.data != "" {
  184. // let result = binance_spot_handle::handle_special_ticker(data);
  185. // trace!(?result)
  186. // }
  187. // }
  188. // _ => {
  189. // error!("没有该命令!mode={}", mold_clone);
  190. // panic!("没有该命令!mode={}", mold_clone)
  191. // }
  192. // }
  193. // }
  194. // };
  195. // });
  196. //
  197. // let t1 = tokio::spawn(async move {
  198. // //链接
  199. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  200. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  201. // });
  202. // try_join!(t1).unwrap();
  203. // }
  204. ExchangeEnum::BinanceSwap => {
  205. let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  206. trace!(symbol_format);
  207. let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
  208. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  209. let write_tx_am = Arc::new(Mutex::new(write_tx));
  210. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  211. let params = BinanceSwapLogin {
  212. api_key: account_info.binance_access_key,
  213. api_secret: account_info.binance_secret_key
  214. };
  215. let mut exchange_wss = if ["depth", "ticker","trade","record"].contains(&mold) {
  216. BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::Public).await
  217. } else {
  218. BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::Private).await
  219. };
  220. exchange_wss.set_symbols(vec![symbol_format.clone()]);
  221. exchange_wss.set_subscribe(subscriber_type.into());
  222. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  223. let mold_clone = mold.to_string().clone();
  224. let fun = move |data: ResponseData| {
  225. let symbol_format_c = symbol_format.clone();
  226. let mold_cc = mold_clone.clone();
  227. async move {
  228. trace!("原始数据 data:{:?}",data);
  229. match mold_cc.as_str() {
  230. "depth" => {
  231. if data.data != "" {
  232. let result = ExchangeStructHandler::order_book_handle(ExchangeEnum::BinanceSwap, &data, &Decimal::ONE);
  233. trace!("-------------------------------");
  234. trace!(?result)
  235. }
  236. }
  237. "ticker" => {
  238. if data.data != "" {
  239. let result = ExchangeStructHandler::book_ticker_handle(ExchangeEnum::BinanceSwap, &data, &Decimal::ONE);
  240. trace!("-------------------------------");
  241. trace!(?result)
  242. }
  243. }
  244. "trade" => {
  245. if data.data != "" {
  246. let result = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, &data, &Decimal::ONE);
  247. trace!("-------------------------------");
  248. trace!(?result)
  249. }
  250. }
  251. "record" => {
  252. if data.data != "" {
  253. let result = ExchangeStructHandler::records_handle(ExchangeEnum::BinanceSwap, &data);
  254. trace!("-------------------------------");
  255. trace!(?result)
  256. }
  257. }
  258. "account" => {
  259. if data.data != "" {
  260. if data.channel != "ACCOUNT_UPDATE" { return; };
  261. let result = ExchangeStructHandler::account_info_handle(ExchangeEnum::BinanceSwap, &data, &symbol_format_c);
  262. trace!("-------------------------------");
  263. trace!(?result)
  264. }
  265. }
  266. "order" => {
  267. if data.data != "" {
  268. if data.channel != "ORDER_TRADE_UPDATE" { return; };
  269. let result = ExchangeStructHandler::order_handle(ExchangeEnum::BinanceSwap, &data, &Decimal::ONE);
  270. trace!("-------------------------------");
  271. trace!(?result)
  272. }
  273. }
  274. "position" => {
  275. if data.data != "" {
  276. if data.channel != "ACCOUNT_UPDATE" { return; };
  277. let result = ExchangeStructHandler::position_handle(ExchangeEnum::BinanceSwap, &data, &Decimal::ONE);
  278. trace!("-------------------------------");
  279. trace!(?result)
  280. }
  281. }
  282. _ => {
  283. error!("没有该命令!mode={}", mold_cc);
  284. panic!("没有该命令!mode={}", mold_cc)
  285. }
  286. };
  287. }
  288. };
  289. exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  290. }
  291. // ExchangeEnum::KucoinSwap => {
  292. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  293. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  294. //
  295. // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
  296. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  297. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  298. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  299. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  300. //
  301. // let params = KucoinSwapLogin {
  302. // access_key: account_info.kucoin_access_key,
  303. // secret_key: account_info.kucoin_secret_key,
  304. // pass_key: account_info.kucoin_pass,
  305. // };
  306. // let mut exchange_wss;
  307. // if ["depth", "ticker"].contains(&mold) {
  308. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
  309. // } else {
  310. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
  311. // }
  312. // exchange_wss.set_symbols(vec![symbol_format]);
  313. // exchange_wss.set_subscribe(subscriber_type.into());
  314. //
  315. // let mold_arc = Arc::new(mold.to_string());
  316. // tokio::spawn(async move {
  317. // let mold_clone = Arc::clone(&mold_arc);
  318. // loop {
  319. // if let Some(data) = read_rx.next().await {
  320. // trace!("原始数据 data:{:?}",data);
  321. // match mold_clone.as_str() {
  322. // "depth" => {
  323. // let result = kucoin_handle::handle_special_depth(data);
  324. // trace!(?result)
  325. // }
  326. // "ticker" => {
  327. // let result = kucoin_handle::handle_special_ticker(data);
  328. // trace!(?result)
  329. // }
  330. // "account" => {
  331. // let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
  332. // trace!(?result)
  333. // }
  334. // "position" => {
  335. // let result = kucoin_handle::handle_position(data, dec!(1));
  336. // trace!(?result)
  337. // }
  338. // "orders" => {
  339. // let result = kucoin_handle::handle_order(data, dec!(0.001));
  340. // trace!(?result)
  341. // }
  342. // _ => {
  343. // error!("没有该命令!mode={}", mold_clone);
  344. // panic!("没有该命令!mode={}", mold_clone)
  345. // }
  346. // }
  347. // }
  348. // }
  349. // });
  350. //
  351. // let t1 = tokio::spawn(async move {
  352. // //链接
  353. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  354. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  355. // });
  356. // try_join!(t1).unwrap();
  357. // }
  358. // ExchangeEnum::KucoinSpot => {
  359. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  360. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  361. // trace!(symbol_format);
  362. // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
  363. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  364. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  365. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  366. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  367. //
  368. // let params = KucoinSpotLogin {
  369. // access_key: account_info.kucoin_access_key,
  370. // secret_key: account_info.kucoin_secret_key,
  371. // pass_key: account_info.kucoin_pass,
  372. // };
  373. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  374. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
  375. // } else {
  376. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
  377. // };
  378. // exchange_wss.set_symbols(vec![symbol_format]);
  379. // exchange_wss.set_subscribe(subscriber_type.into());
  380. //
  381. // let mold_arc = Arc::new(mold.to_string());
  382. // tokio::spawn(async move {
  383. // let mold_clone = Arc::clone(&mold_arc);
  384. // loop {
  385. // if let Some(data) = read_rx.next().await {
  386. // trace!("原始数据 data:{:?}",data);
  387. // match mold_clone.as_str() {
  388. // "depth" => {
  389. // if data.data != "" {
  390. // let result = kucoin_spot_handle::handle_special_depth(data);
  391. // trace!(?result)
  392. // }
  393. // }
  394. // "ticker" => {
  395. // if data.data != "" {
  396. // let result = kucoin_spot_handle::handle_special_ticker(data);
  397. // trace!(?result)
  398. // }
  399. // }
  400. // "account" => {
  401. // if data.data != "" {
  402. // let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
  403. // trace!(?result)
  404. // }
  405. // }
  406. // "orders" => {
  407. // if data.data != "" {
  408. // let result = kucoin_spot_handle::handle_order(data, dec!(1));
  409. // trace!(?result)
  410. // }
  411. // }
  412. // _ => {
  413. // error!("没有该命令!mode={}", mold_clone);
  414. // panic!("没有该命令!mode={}", mold_clone)
  415. // }
  416. // }
  417. // }
  418. // }
  419. // });
  420. // let t1 = tokio::spawn(async move {
  421. // //链接
  422. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  423. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  424. // });
  425. // try_join!(t1).unwrap();
  426. // }
  427. ExchangeEnum::GateSwap => {
  428. // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
  429. // trace!(symbol_format);
  430. // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
  431. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  432. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  433. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  434. //
  435. // let params = GateSwapLogin {
  436. // api_key: account_info.gate_access_key,
  437. // secret: account_info.gate_secret_key,
  438. // };
  439. // let mut exchange_wss = GateSwapWs::new_label(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  440. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  441. // exchange_wss.set_subscribe(subscriber_type.into());
  442. //
  443. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  444. // let mold_clone = mold.to_string().clone();
  445. // let fun = move |data: ResponseData| {
  446. // let symbol_format_c = symbol_format.clone();
  447. // let mold_cc = mold_clone.clone();
  448. //
  449. // async move {
  450. // trace!("原始数据 data:{:?}",data);
  451. // match mold_cc.as_str() {
  452. // "depth" => {
  453. // if data.data != "" {
  454. // let result = handle_info::format_depth(ExchangeEnum::GateSwap, &data);
  455. // trace!(?result)
  456. // }
  457. // }
  458. // "ticker" => {
  459. // if data.data != "" {
  460. // let result = gate_swap_handle::handle_book_ticker(&data);
  461. // trace!(?result)
  462. // }
  463. // }
  464. // "account" => {
  465. // if data.data != "" {
  466. // let result = gate_swap_handle::handle_account_info(&data, &symbol_format_c);
  467. // trace!(?result)
  468. // }
  469. // }
  470. // "orders" => {
  471. // if data.data != "" {
  472. // let result = gate_swap_handle::handle_order(data, dec!(1));
  473. // trace!(?result)
  474. // }
  475. // }
  476. // _ => {
  477. // error!("没有该命令!mode={}", mold_cc);
  478. // panic!("没有该命令!mode={}", mold_cc)
  479. // }
  480. // };
  481. // }
  482. // };
  483. // exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  484. }
  485. // ExchangeEnum::BitgetSpot => {
  486. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  487. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  488. // trace!(symbol_format);
  489. // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
  490. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  491. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  492. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  493. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  494. //
  495. // let params = BitgetSpotLogin {
  496. // api_key: account_info.bitget_access_key,
  497. // secret_key: account_info.bitget_secret_key,
  498. // passphrase_key: account_info.bitget_pass,
  499. // };
  500. //
  501. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  502. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
  503. // } else {
  504. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
  505. // };
  506. // exchange_wss.set_symbols(vec![symbol_format]);
  507. // exchange_wss.set_subscribe(subscriber_type.into());
  508. //
  509. // let mold_arc = Arc::new(mold.to_string());
  510. // //读取
  511. // tokio::spawn(async move {
  512. // loop {
  513. // let mold_clone = Arc::clone(&mold_arc);
  514. // if let Some(data) = read_rx.next().await {
  515. // trace!("原始数据 data:{:?}",data);
  516. // match mold_clone.as_str() {
  517. // "depth" => {
  518. // if data.data != "" {
  519. // let result = bitget_spot_handle::handle_special_depth(data);
  520. // trace!(?result)
  521. // }
  522. // }
  523. // "ticker" => {
  524. // if data.data != "" {
  525. // let result = bitget_spot_handle::handle_special_ticker(data);
  526. // trace!(?result)
  527. // }
  528. // }
  529. // "account" => {
  530. // if data.data != "" {
  531. // let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
  532. // trace!(?result)
  533. // }
  534. // }
  535. // "orders" => {
  536. // if data.data != "" {
  537. // let result = bitget_spot_handle::handle_order(data, dec!(1));
  538. // trace!(?result)
  539. // }
  540. // }
  541. // _ => {
  542. // error!("没有该命令!mode={}", mold_clone);
  543. // panic!("没有该命令!mode={}", mold_clone)
  544. // }
  545. // }
  546. // }
  547. // }
  548. // });
  549. // let t1 = tokio::spawn(async move {
  550. // //链接
  551. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  552. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  553. // });
  554. // try_join!(t1).unwrap();
  555. // }
  556. // ExchangeEnum::OkxSwap => {
  557. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  558. // trace!(symbol_format);
  559. // let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
  560. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  561. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  562. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  563. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  564. //
  565. // let params = OkxSwapLogin {
  566. // api_key: account_info.okx_access_key,
  567. // secret_key: account_info.okx_secret_key,
  568. // passphrase: account_info.okx_pass,
  569. // };
  570. //
  571. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  572. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Public)
  573. // } else if ["account", "orders", "position"].contains(&mold) {
  574. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Private)
  575. // } else {
  576. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Business)
  577. // };
  578. //
  579. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  580. // exchange_wss.set_subscribe(subscriber_type.into());
  581. //
  582. // let mold_arc = Arc::new(mold.to_string());
  583. // tokio::spawn(async move {
  584. // let mold_clone = Arc::clone(&mold_arc);
  585. // loop {
  586. // if let Some(data) = read_rx.next().await {
  587. // trace!("原始数据 data:{:?}",data);
  588. // match mold_clone.as_str() {
  589. // "depth" => {
  590. // if data.data != "" {
  591. // let result = okx_handle::handle_special_depth(data);
  592. // trace!(?result)
  593. // }
  594. // }
  595. // "ticker" => {
  596. // if data.data != "" {
  597. // let result = okx_handle::handle_special_ticker(data);
  598. // trace!(?result)
  599. // }
  600. // }
  601. // "account" => {
  602. // if data.data != "" {
  603. // let result = okx_handle::handle_account_info(data, symbol_format.clone());
  604. // trace!(?result)
  605. // }
  606. // }
  607. // "position" => {
  608. // if data.data != "" {
  609. // let result = okx_handle::handle_position(data, dec!(10));
  610. // trace!(?result)
  611. // }
  612. // }
  613. // "orders" => {
  614. // if data.data != "" {
  615. // let result = okx_handle::handle_order(data, dec!(10));
  616. // trace!(?result)
  617. // }
  618. // }
  619. // _ => {
  620. // error!("没有该命令!mode={}", mold_clone);
  621. // panic!("没有该命令!mode={}", mold_clone)
  622. // }
  623. // }
  624. // }
  625. // }
  626. // });
  627. //
  628. // let t1 = tokio::spawn(async move {
  629. // //链接
  630. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  631. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  632. // });
  633. // try_join!(t1).unwrap();
  634. // }
  635. // ExchangeEnum::HtxSwap => {
  636. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  637. // trace!(symbol_format);
  638. // let name = format!("htx_swap@{}", symbol.to_string().to_lowercase());
  639. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  640. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  641. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  642. //
  643. // let params = HtxSwapLogin {
  644. // api_key: account_info.htx_access_key,
  645. // secret: account_info.htx_secret_key,
  646. // };
  647. //
  648. // let htx_wss_type = match mold.to_string().clone().as_str() {
  649. // "depth" => HtxSwapWsType::Public,
  650. // _ => HtxSwapWsType::Private
  651. // };
  652. //
  653. // let mut exchange_wss = HtxSwapWs::new_label(name, Option::from(params), htx_wss_type);
  654. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  655. // exchange_wss.set_subscribe(subscriber_type.into());
  656. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  657. // let mold_clone = mold.to_string().clone();
  658. // let fun = move |data: ResponseData| {
  659. // let symbol_format_c = symbol_format.clone();
  660. // let mold_cc = mold_clone.clone();
  661. //
  662. // async move {
  663. // trace!("原始数据 data:{:?}",data);
  664. // match mold_cc.as_str() {
  665. // "depth" => {
  666. // if data.data != "" {
  667. // let result = handle_info::format_depth(ExchangeEnum::HtxSwap, &data);
  668. // trace!("-------------------------------");
  669. // trace!(?result)
  670. // }
  671. // }
  672. // "position" => {
  673. // if data.data != "" {
  674. // let result = htx_swap_handle::handle_position(&data, &dec!(10));
  675. // trace!("-------------------------------");
  676. // trace!(?result)
  677. // }
  678. // }
  679. // "account" => {
  680. // if data.data != "" {
  681. // let result = htx_swap_handle::handle_account_info(&data, &symbol_format_c);
  682. // trace!("-------------------------------");
  683. // trace!(?result)
  684. // }
  685. // }
  686. // "orders" => {
  687. // println!("{:?}", data);
  688. // if data.data != "" {
  689. // let result = htx_swap_handle::handle_order(data, dec!(10));
  690. // trace!("-------------------------------");
  691. // trace!(?result)
  692. // }
  693. // }
  694. // _ => {
  695. // error!("没有该命令!mode={}", mold_cc);
  696. // panic!("没有该命令!mode={}", mold_cc)
  697. // }
  698. // };
  699. // }
  700. // };
  701. // exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  702. // }
  703. _ => {
  704. error!("该交易所不支持!test_new_exchange_wss:{:?}", exchange);
  705. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  706. }
  707. }
  708. }