exchange_test.rs 36 KB


  1. use std::collections::{BTreeMap};
  2. use std::io::{Error};
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use rust_decimal_macros::dec;
  6. use serde::de::Unexpected::Option;
  7. use tokio::sync::mpsc::{channel, Receiver, Sender};
  8. use tokio::sync::Mutex;
  9. use tracing::{error, trace};
  10. // use exchanges::binance_spot_ws::{BinanceSpotLogin, BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  11. // use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  12. // use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  13. // use exchanges::kucoin_spot_ws::{KucoinSpotLogin, KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
  14. // use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  15. // use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
  16. // use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  17. use exchanges::htx_swap_ws::{HtxSwapLogin, HtxSwapSubscribeType, HtxSwapWs, HtxSwapWsType};
  18. use exchanges::response_base::ResponseData;
  19. use standard::exchange::{Exchange, ExchangeEnum};
  20. // use standard::{binance_spot_handle, Order, Platform, utils};
  21. // use standard::{binance_handle, Order, Platform, utils};
  22. // use standard::{kucoin_handle, Order, Platform, utils};
  23. // use standard::{kucoin_spot_handle, Order, Platform, utils};
  24. // use standard::{gate_swap_handle, handle_info, Order, Platform, utils};
  25. // use standard::{okx_handle, Order, Platform, utils};
  26. // use standard::{bitget_spot_handle, Order, Platform, utils};
  27. // use standard::{htx_swap_handle, handle_info, Order, Platform, utils};
  28. use standard::{phemex_swap_handle, handle_info, Order, Platform, utils};
  29. // 创建实体
  30. #[allow(dead_code)]
  31. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  32. match exchange {
  33. ExchangeEnum::PhemexSwap => utils::proxy_handle(Some("phemex")),
  34. _ => utils::proxy_handle(None)
  35. }
  36. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  37. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  38. let account_info = global::account_info::get_account_info("../test_account.toml");
  39. match exchange {
  40. ExchangeEnum::BinanceSwap => {
  41. let mut params: BTreeMap<String, String> = BTreeMap::new();
  42. let access_key = account_info.binance_access_key;
  43. let secret_key = account_info.binance_secret_key;
  44. params.insert("access_key".to_string(), access_key);
  45. params.insert("secret_key".to_string(), secret_key);
  46. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  47. }
  48. // ExchangeEnum::BinanceSpot => {
  49. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  50. // let access_key = account_info.binance_access_key;
  51. // let secret_key = account_info.binance_secret_key;
  52. // params.insert("access_key".to_string(), access_key);
  53. // params.insert("secret_key".to_string(), secret_key);
  54. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  55. // }
  56. ExchangeEnum::GateSwap => {
  57. let mut params: BTreeMap<String, String> = BTreeMap::new();
  58. let access_key = account_info.gate_access_key;
  59. let secret_key = account_info.gate_secret_key;
  60. params.insert("access_key".to_string(), access_key);
  61. params.insert("secret_key".to_string(), secret_key);
  62. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  63. }
  64. // ExchangeEnum::GateSpot => {
  65. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  66. // let access_key = account_info.gate_access_key;
  67. // let secret_key = account_info.gate_secret_key;
  68. // params.insert("access_key".to_string(), access_key);
  69. // params.insert("secret_key".to_string(), secret_key);
  70. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  71. // }
  72. ExchangeEnum::KucoinSwap => {
  73. let mut params: BTreeMap<String, String> = BTreeMap::new();
  74. let access_key = account_info.kucoin_access_key;
  75. let secret_key = account_info.kucoin_secret_key;
  76. let pass_key = account_info.kucoin_pass;
  77. params.insert("access_key".to_string(), access_key);
  78. params.insert("secret_key".to_string(), secret_key);
  79. params.insert("pass_key".to_string(), pass_key);
  80. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  81. }
  82. // ExchangeEnum::KucoinSpot => {
  83. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  84. // let access_key = account_info.kucoin_access_key;
  85. // let secret_key = account_info.kucoin_secret_key;
  86. // let pass_key = account_info.kucoin_pass;
  87. // params.insert("access_key".to_string(), access_key);
  88. // params.insert("secret_key".to_string(), secret_key);
  89. // params.insert("pass_key".to_string(), pass_key);
  90. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  91. // }
  92. // ExchangeEnum::OkxSwap => {
  93. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  94. // let access_key = account_info.okx_access_key;
  95. // let secret_key = account_info.okx_secret_key;
  96. // let pass_key = account_info.okx_pass;
  97. // params.insert("access_key".to_string(), access_key);
  98. // params.insert("secret_key".to_string(), secret_key);
  99. // params.insert("pass_key".to_string(), pass_key);
  100. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  101. // }
  102. // ExchangeEnum::BitgetSpot => {
  103. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  104. // let access_key = account_info.bitget_access_key;
  105. // let secret_key = account_info.bitget_secret_key;
  106. // let pass_key = account_info.bitget_pass;
  107. // params.insert("access_key".to_string(), access_key);
  108. // params.insert("secret_key".to_string(), secret_key);
  109. // params.insert("pass_key".to_string(), pass_key);
  110. // Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  111. // }
  112. ExchangeEnum::BitgetSwap => {
  113. let mut params: BTreeMap<String, String> = BTreeMap::new();
  114. let access_key = account_info.bitget_access_key;
  115. let secret_key = account_info.bitget_secret_key;
  116. let pass_key = account_info.bitget_pass;
  117. params.insert("access_key".to_string(), access_key);
  118. params.insert("secret_key".to_string(), secret_key);
  119. params.insert("pass_key".to_string(), pass_key);
  120. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  121. }
  122. ExchangeEnum::BybitSwap => {
  123. let mut params: BTreeMap<String, String> = BTreeMap::new();
  124. let access_key = account_info.bybit_access_key;
  125. let secret_key = account_info.bybit_secret_key;
  126. let pass_key = account_info.bybit_pass;
  127. params.insert("access_key".to_string(), access_key);
  128. params.insert("secret_key".to_string(), secret_key);
  129. params.insert("pass_key".to_string(), pass_key);
  130. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  131. }
  132. ExchangeEnum::HtxSwap => {
  133. let mut params: BTreeMap<String, String> = BTreeMap::new();
  134. let access_key = account_info.htx_access_key;
  135. let secret_key = account_info.htx_secret_key;
  136. let pass_key = account_info.htx_pass;
  137. params.insert("access_key".to_string(), access_key);
  138. params.insert("secret_key".to_string(), secret_key);
  139. params.insert("pass_key".to_string(), pass_key);
  140. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  141. }
  142. ExchangeEnum::PhemexSwap => {
  143. let mut params: BTreeMap<String, String> = BTreeMap::new();
  144. let access_key = account_info.phemex_access_key;
  145. let secret_key = account_info.phemex_secret_key;
  146. let pass_key = account_info.phemex_pass;
  147. params.insert("access_key".to_string(), access_key);
  148. params.insert("secret_key".to_string(), secret_key);
  149. params.insert("pass_key".to_string(), pass_key);
  150. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  151. }
  152. _ => {
  153. panic!("该交易所未实现!")
  154. }
  155. }
  156. }
  157. #[allow(dead_code)]
  158. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<HtxSwapSubscribeType>: From<T> {
  159. match exchange {
  160. ExchangeEnum::PhemexSwap => utils::proxy_handle(Some("phemex")),
  161. _ => utils::proxy_handle(None)
  162. }
  163. let account_info = global::account_info::get_account_info("../test_account.toml");
  164. match exchange {
  165. // ExchangeEnum::BinanceSpot => {
  166. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  167. // trace!(symbol_format);
  168. // let name = format!("binance_spot@{}", symbol.to_string().to_lowercase());
  169. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  170. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  171. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  172. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  173. //
  174. // let params = BinanceSpotLogin {
  175. // api_key: account_info.binance_access_key,
  176. // api_secret: account_info.binance_secret_key,
  177. // };
  178. // let mut exchange_wss;
  179. // exchange_wss = BinanceSpotWs::new_label(name, false, Option::from(params), BinanceSpotWsType::PublicAndPrivate);
  180. // exchange_wss.set_symbols(vec![symbol_format]);
  181. // exchange_wss.set_subscribe(subscriber_type.into());
  182. //
  183. //
  184. // let mold_arc = Arc::new(mold.to_string());
  185. // //读取
  186. // tokio::spawn(async move {
  187. // let mold_clone = Arc::clone(&mold_arc);
  188. // loop {
  189. // if let Some(data) = read_rx.next().await {
  190. // trace!("原始数据 data:{:?}",data);
  191. // match mold_clone.as_str() {
  192. // "depth" => {
  193. // if data.data != "" {
  194. // let result = binance_spot_handle::handle_special_depth(data);
  195. // trace!(?result)
  196. // }
  197. // }
  198. // "ticker" => {
  199. // if data.data != "" {
  200. // let result = binance_spot_handle::handle_special_ticker(data);
  201. // trace!(?result)
  202. // }
  203. // }
  204. // _ => {
  205. // error!("没有该命令!mode={}", mold_clone);
  206. // panic!("没有该命令!mode={}", mold_clone)
  207. // }
  208. // }
  209. // }
  210. // };
  211. // });
  212. //
  213. // let t1 = tokio::spawn(async move {
  214. // //链接
  215. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  216. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  217. // });
  218. // try_join!(t1).unwrap();
  219. // }
  220. ExchangeEnum::BinanceSwap => {
  221. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  222. // trace!(symbol_format);
  223. // let name = format!("binance_swap@{}", symbol.to_string().to_lowercase());
  224. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  225. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  226. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  227. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  228. //
  229. // let params = BinanceSwapLogin {
  230. // api_key: account_info.binance_access_key,
  231. // api_secret: account_info.binance_secret_key,
  232. // };
  233. // let mut exchange_wss;
  234. // exchange_wss = BinanceSwapWs::new_label(name, false, Option::from(params), BinanceSwapWsType::PublicAndPrivate);
  235. // exchange_wss.set_symbols(vec![symbol_format]);
  236. // exchange_wss.set_subscribe(subscriber_type.into());
  237. //
  238. //
  239. // let mold_arc = Arc::new(mold.to_string());
  240. // //读取
  241. // tokio::spawn(async move {
  242. // let mold_clone = Arc::clone(&mold_arc);
  243. // loop {
  244. // if let Some(data) = read_rx.next().await {
  245. // trace!("原始数据 data:{:?}",data);
  246. // match mold_clone.as_str() {
  247. // "depth" => {
  248. // if data.data != "" {
  249. // let result = binance_handle::handle_special_depth(data);
  250. // trace!(?result)
  251. // }
  252. // }
  253. // "ticker" => {
  254. // if data.data != "" {
  255. // let result = binance_handle::handle_special_ticker(data);
  256. // trace!(?result)
  257. // }
  258. // }
  259. // _ => {
  260. // error!("没有该命令!mode={}", mold_clone);
  261. // panic!("没有该命令!mode={}", mold_clone)
  262. // }
  263. // }
  264. // }
  265. // };
  266. // });
  267. //
  268. // let t1 = tokio::spawn(async move {
  269. // //链接
  270. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  271. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  272. // });
  273. // try_join!(t1).unwrap();
  274. }
  275. ExchangeEnum::KucoinSwap => {
  276. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  277. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  278. //
  279. // let name = format!("kucoin_swap@{}", symbol.to_string().to_lowercase());
  280. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  281. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  282. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  283. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  284. //
  285. // let params = KucoinSwapLogin {
  286. // access_key: account_info.kucoin_access_key,
  287. // secret_key: account_info.kucoin_secret_key,
  288. // pass_key: account_info.kucoin_pass,
  289. // };
  290. // let mut exchange_wss;
  291. // if ["depth", "ticker"].contains(&mold) {
  292. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Public).await;
  293. // } else {
  294. // exchange_wss = KucoinSwapWs::new_label(name, false, Option::from(params), KucoinSwapWsType::Private).await;
  295. // }
  296. // exchange_wss.set_symbols(vec![symbol_format]);
  297. // exchange_wss.set_subscribe(subscriber_type.into());
  298. //
  299. // let mold_arc = Arc::new(mold.to_string());
  300. // tokio::spawn(async move {
  301. // let mold_clone = Arc::clone(&mold_arc);
  302. // loop {
  303. // if let Some(data) = read_rx.next().await {
  304. // trace!("原始数据 data:{:?}",data);
  305. // match mold_clone.as_str() {
  306. // "depth" => {
  307. // let result = kucoin_handle::handle_special_depth(data);
  308. // trace!(?result)
  309. // }
  310. // "ticker" => {
  311. // let result = kucoin_handle::handle_special_ticker(data);
  312. // trace!(?result)
  313. // }
  314. // "account" => {
  315. // let result = kucoin_handle::handle_account_info(data, symbol_back.clone());
  316. // trace!(?result)
  317. // }
  318. // "position" => {
  319. // let result = kucoin_handle::handle_position(data, dec!(1));
  320. // trace!(?result)
  321. // }
  322. // "orders" => {
  323. // let result = kucoin_handle::handle_order(data, dec!(0.001));
  324. // trace!(?result)
  325. // }
  326. // _ => {
  327. // error!("没有该命令!mode={}", mold_clone);
  328. // panic!("没有该命令!mode={}", mold_clone)
  329. // }
  330. // }
  331. // }
  332. // }
  333. // });
  334. //
  335. // let t1 = tokio::spawn(async move {
  336. // //链接
  337. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  338. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  339. // });
  340. // try_join!(t1).unwrap();
  341. }
  342. // ExchangeEnum::KucoinSpot => {
  343. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  344. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  345. // trace!(symbol_format);
  346. // let name = format!("kucoin_spot@{}", symbol.to_string().to_lowercase());
  347. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  348. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  349. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  350. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  351. //
  352. // let params = KucoinSpotLogin {
  353. // access_key: account_info.kucoin_access_key,
  354. // secret_key: account_info.kucoin_secret_key,
  355. // pass_key: account_info.kucoin_pass,
  356. // };
  357. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  358. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Public).await
  359. // } else {
  360. // KucoinSpotWs::new_label(name, false, Option::from(params), KucoinSpotWsType::Private).await
  361. // };
  362. // exchange_wss.set_symbols(vec![symbol_format]);
  363. // exchange_wss.set_subscribe(subscriber_type.into());
  364. //
  365. // let mold_arc = Arc::new(mold.to_string());
  366. // tokio::spawn(async move {
  367. // let mold_clone = Arc::clone(&mold_arc);
  368. // loop {
  369. // if let Some(data) = read_rx.next().await {
  370. // trace!("原始数据 data:{:?}",data);
  371. // match mold_clone.as_str() {
  372. // "depth" => {
  373. // if data.data != "" {
  374. // let result = kucoin_spot_handle::handle_special_depth(data);
  375. // trace!(?result)
  376. // }
  377. // }
  378. // "ticker" => {
  379. // if data.data != "" {
  380. // let result = kucoin_spot_handle::handle_special_ticker(data);
  381. // trace!(?result)
  382. // }
  383. // }
  384. // "account" => {
  385. // if data.data != "" {
  386. // let result = kucoin_spot_handle::handle_account_info(data, symbol_back.clone());
  387. // trace!(?result)
  388. // }
  389. // }
  390. // "orders" => {
  391. // if data.data != "" {
  392. // let result = kucoin_spot_handle::handle_order(data, dec!(1));
  393. // trace!(?result)
  394. // }
  395. // }
  396. // _ => {
  397. // error!("没有该命令!mode={}", mold_clone);
  398. // panic!("没有该命令!mode={}", mold_clone)
  399. // }
  400. // }
  401. // }
  402. // }
  403. // });
  404. // let t1 = tokio::spawn(async move {
  405. // //链接
  406. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  407. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  408. // });
  409. // try_join!(t1).unwrap();
  410. // }
  411. ExchangeEnum::GateSwap => {
  412. // let symbol_format = utils::format_symbol(symbol.to_string(), "_").to_uppercase();
  413. // trace!(symbol_format);
  414. // let name = format!("gate_swap@{}", symbol.to_string().to_lowercase());
  415. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  416. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  417. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  418. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  419. //
  420. // let params = GateSwapLogin {
  421. // api_key: account_info.gate_access_key,
  422. // secret: account_info.gate_secret_key,
  423. // };
  424. // let mut exchange_wss = GateSwapWs::new_label(name, false, Option::from(params), GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  425. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  426. // exchange_wss.set_subscribe(subscriber_type.into());
  427. //
  428. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  429. // let mold_clone = mold.to_string().clone();
  430. // let fun = move |data: ResponseData| {
  431. // let symbol_format_c = symbol_format.clone();
  432. // let mold_cc = mold_clone.clone();
  433. //
  434. // async move {
  435. // trace!("原始数据 data:{:?}",data);
  436. // match mold_cc.as_str() {
  437. // "depth" => {
  438. // if data.data != "" {
  439. // let result = handle_info::format_depth(ExchangeEnum::GateSwap, &data);
  440. // trace!(?result)
  441. // }
  442. // }
  443. // "ticker" => {
  444. // if data.data != "" {
  445. // let result = gate_swap_handle::handle_book_ticker(&data);
  446. // trace!(?result)
  447. // }
  448. // }
  449. // "account" => {
  450. // if data.data != "" {
  451. // let result = gate_swap_handle::handle_account_info(&data, &symbol_format_c);
  452. // trace!(?result)
  453. // }
  454. // }
  455. // "orders" => {
  456. // if data.data != "" {
  457. // let result = gate_swap_handle::handle_order(data, dec!(1));
  458. // trace!(?result)
  459. // }
  460. // }
  461. // _ => {
  462. // error!("没有该命令!mode={}", mold_cc);
  463. // panic!("没有该命令!mode={}", mold_cc)
  464. // }
  465. // };
  466. // }
  467. // };
  468. // exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  469. }
  470. // ExchangeEnum::BitgetSpot => {
  471. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  472. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  473. // trace!(symbol_format);
  474. // let name = format!("bitget_spot@{}", symbol.to_string().to_lowercase());
  475. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  476. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  477. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  478. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  479. //
  480. // let params = BitgetSpotLogin {
  481. // api_key: account_info.bitget_access_key,
  482. // secret_key: account_info.bitget_secret_key,
  483. // passphrase_key: account_info.bitget_pass,
  484. // };
  485. //
  486. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  487. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Public)
  488. // } else {
  489. // BitgetSpotWs::new_label(name, false, Option::from(params), BitgetSpotWsType::Private)
  490. // };
  491. // exchange_wss.set_symbols(vec![symbol_format]);
  492. // exchange_wss.set_subscribe(subscriber_type.into());
  493. //
  494. // let mold_arc = Arc::new(mold.to_string());
  495. // //读取
  496. // tokio::spawn(async move {
  497. // loop {
  498. // let mold_clone = Arc::clone(&mold_arc);
  499. // if let Some(data) = read_rx.next().await {
  500. // trace!("原始数据 data:{:?}",data);
  501. // match mold_clone.as_str() {
  502. // "depth" => {
  503. // if data.data != "" {
  504. // let result = bitget_spot_handle::handle_special_depth(data);
  505. // trace!(?result)
  506. // }
  507. // }
  508. // "ticker" => {
  509. // if data.data != "" {
  510. // let result = bitget_spot_handle::handle_special_ticker(data);
  511. // trace!(?result)
  512. // }
  513. // }
  514. // "account" => {
  515. // if data.data != "" {
  516. // let result = bitget_spot_handle::handle_account_info(data, symbol_back.clone());
  517. // trace!(?result)
  518. // }
  519. // }
  520. // "orders" => {
  521. // if data.data != "" {
  522. // let result = bitget_spot_handle::handle_order(data, dec!(1));
  523. // trace!(?result)
  524. // }
  525. // }
  526. // _ => {
  527. // error!("没有该命令!mode={}", mold_clone);
  528. // panic!("没有该命令!mode={}", mold_clone)
  529. // }
  530. // }
  531. // }
  532. // }
  533. // });
  534. // let t1 = tokio::spawn(async move {
  535. // //链接
  536. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  537. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  538. // });
  539. // try_join!(t1).unwrap();
  540. // }
  541. // ExchangeEnum::OkxSwap => {
  542. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  543. // trace!(symbol_format);
  544. // let name = format!("okx_swap@{}", symbol.to_string().to_lowercase());
  545. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  546. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  547. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  548. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  549. //
  550. // let params = OkxSwapLogin {
  551. // api_key: account_info.okx_access_key,
  552. // secret_key: account_info.okx_secret_key,
  553. // passphrase: account_info.okx_pass,
  554. // };
  555. //
  556. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  557. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Public)
  558. // } else if ["account", "orders", "position"].contains(&mold) {
  559. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Private)
  560. // } else {
  561. // OkxSwapWs::new_label(name, false, Option::from(params), OkxSwapWsType::Business)
  562. // };
  563. //
  564. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  565. // exchange_wss.set_subscribe(subscriber_type.into());
  566. //
  567. // let mold_arc = Arc::new(mold.to_string());
  568. // tokio::spawn(async move {
  569. // let mold_clone = Arc::clone(&mold_arc);
  570. // loop {
  571. // if let Some(data) = read_rx.next().await {
  572. // trace!("原始数据 data:{:?}",data);
  573. // match mold_clone.as_str() {
  574. // "depth" => {
  575. // if data.data != "" {
  576. // let result = okx_handle::handle_special_depth(data);
  577. // trace!(?result)
  578. // }
  579. // }
  580. // "ticker" => {
  581. // if data.data != "" {
  582. // let result = okx_handle::handle_special_ticker(data);
  583. // trace!(?result)
  584. // }
  585. // }
  586. // "account" => {
  587. // if data.data != "" {
  588. // let result = okx_handle::handle_account_info(data, symbol_format.clone());
  589. // trace!(?result)
  590. // }
  591. // }
  592. // "position" => {
  593. // if data.data != "" {
  594. // let result = okx_handle::handle_position(data, dec!(10));
  595. // trace!(?result)
  596. // }
  597. // }
  598. // "orders" => {
  599. // if data.data != "" {
  600. // let result = okx_handle::handle_order(data, dec!(10));
  601. // trace!(?result)
  602. // }
  603. // }
  604. // _ => {
  605. // error!("没有该命令!mode={}", mold_clone);
  606. // panic!("没有该命令!mode={}", mold_clone)
  607. // }
  608. // }
  609. // }
  610. // }
  611. // });
  612. //
  613. // let t1 = tokio::spawn(async move {
  614. // //链接
  615. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  616. // exchange_wss.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  617. // });
  618. // try_join!(t1).unwrap();
  619. // }
  620. // ExchangeEnum::HtxSwap => {
  621. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  622. // trace!(symbol_format);
  623. // let name = format!("htx_swap@{}", symbol.to_string().to_lowercase());
  624. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  625. // let write_tx_am = Arc::new(Mutex::new(write_tx));
  626. // let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  627. //
  628. // let params = HtxSwapLogin {
  629. // api_key: account_info.htx_access_key,
  630. // secret: account_info.htx_secret_key,
  631. // };
  632. //
  633. // let htx_wss_type = match mold.to_string().clone().as_str() {
  634. // "depth" => HtxSwapWsType::Public,
  635. // _ => HtxSwapWsType::Private
  636. // };
  637. //
  638. // let mut exchange_wss = HtxSwapWs::new_label(name, Option::from(params), htx_wss_type);
  639. // exchange_wss.set_symbols(vec![symbol_format.clone()]);
  640. // exchange_wss.set_subscribe(subscriber_type.into());
  641. // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  642. // let mold_clone = mold.to_string().clone();
  643. // let fun = move |data: ResponseData| {
  644. // let symbol_format_c = symbol_format.clone();
  645. // let mold_cc = mold_clone.clone();
  646. //
  647. // async move {
  648. // trace!("原始数据 data:{:?}",data);
  649. // match mold_cc.as_str() {
  650. // "depth" => {
  651. // if data.data != "" {
  652. // let result = handle_info::format_depth(ExchangeEnum::HtxSwap, &data);
  653. // trace!("-------------------------------");
  654. // trace!(?result)
  655. // }
  656. // }
  657. // "position" => {
  658. // if data.data != "" {
  659. // let result = htx_swap_handle::handle_position(&data, &dec!(10));
  660. // trace!("-------------------------------");
  661. // trace!(?result)
  662. // }
  663. // }
  664. // "account" => {
  665. // if data.data != "" {
  666. // let result = htx_swap_handle::handle_account_info(&data, &symbol_format_c);
  667. // trace!("-------------------------------");
  668. // trace!(?result)
  669. // }
  670. // }
  671. // "orders" => {
  672. // println!("{:?}", data);
  673. // if data.data != "" {
  674. // let result = htx_swap_handle::handle_order(data, dec!(10));
  675. // trace!("-------------------------------");
  676. // trace!(?result)
  677. // }
  678. // }
  679. // _ => {
  680. // error!("没有该命令!mode={}", mold_cc);
  681. // panic!("没有该命令!mode={}", mold_cc)
  682. // }
  683. // };
  684. // }
  685. // };
  686. // exchange_wss.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  687. // }
  688. _ => {
  689. error!("该交易所不支持!test_new_exchange_wss:{:?}", exchange);
  690. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  691. }
  692. }
  693. }