exchange_test.rs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. use std::collections::{BTreeMap};
  2. use std::{env};
  3. use std::io::{Error};
  4. use std::sync::Arc;
  5. use std::sync::atomic::AtomicBool;
  6. use std::time::Duration;
  7. use rust_decimal_macros::dec;
  8. use tokio::sync::mpsc::{channel, Receiver, Sender};
  9. use tokio::try_join;
  10. use tracing::{error, trace};
  11. // use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
  12. use exchanges::kucoin_spot_ws::{KucoinSpotWs, KucoinSubscribeType, KucoinWsType};
  13. // use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  14. // use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
  15. // use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
  16. use exchanges::response_base::ResponseData;
  17. use standard::exchange::{Exchange, ExchangeEnum};
  18. // use standard::{binance_handle, Order, Platform, utils};
  19. // use standard::{okx_handle, Order, Platform, utils};
  20. // use standard::{kucoin_handle, Order, Platform, utils};
  21. use standard::{kucoin_spot_handle, Order, Platform, utils};
  22. // use standard::{bitget_spot_handle, Order, Platform, utils};
  23. // 创建实体
  24. #[allow(dead_code)]
  25. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  26. utils::proxy_handle();
  27. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  28. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  29. let account_info = global::account_info::get_account_info("../test_account.toml");
  30. match exchange {
  31. ExchangeEnum::BinanceSwap => {
  32. let mut params: BTreeMap<String, String> = BTreeMap::new();
  33. let access_key = account_info.binance_access_key;
  34. let secret_key = account_info.binance_secret_key;
  35. params.insert("access_key".to_string(), access_key);
  36. params.insert("secret_key".to_string(), secret_key);
  37. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  38. }
  39. ExchangeEnum::BinanceSpot => {
  40. let mut params: BTreeMap<String, String> = BTreeMap::new();
  41. let access_key = account_info.binance_access_key;
  42. let secret_key = account_info.binance_secret_key;
  43. params.insert("access_key".to_string(), access_key);
  44. params.insert("secret_key".to_string(), secret_key);
  45. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  46. }
  47. ExchangeEnum::GateSwap => {
  48. let mut params: BTreeMap<String, String> = BTreeMap::new();
  49. let access_key = account_info.gate_access_key;
  50. let secret_key = account_info.gate_secret_key;
  51. params.insert("access_key".to_string(), access_key);
  52. params.insert("secret_key".to_string(), secret_key);
  53. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  54. }
  55. ExchangeEnum::GateSpot => {
  56. let mut params: BTreeMap<String, String> = BTreeMap::new();
  57. let access_key = account_info.gate_access_key;
  58. let secret_key = account_info.gate_secret_key;
  59. params.insert("access_key".to_string(), access_key);
  60. params.insert("secret_key".to_string(), secret_key);
  61. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  62. }
  63. ExchangeEnum::KucoinSwap => {
  64. let mut params: BTreeMap<String, String> = BTreeMap::new();
  65. let access_key = account_info.kucoin_access_key;
  66. let secret_key = account_info.kucoin_secret_key;
  67. let pass_key = account_info.kucoin_pass;
  68. params.insert("access_key".to_string(), access_key);
  69. params.insert("secret_key".to_string(), secret_key);
  70. params.insert("pass_key".to_string(), pass_key);
  71. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  72. }
  73. ExchangeEnum::KucoinSpot => {
  74. let mut params: BTreeMap<String, String> = BTreeMap::new();
  75. let access_key = account_info.kucoin_access_key;
  76. let secret_key = account_info.kucoin_secret_key;
  77. let pass_key = account_info.kucoin_pass;
  78. params.insert("access_key".to_string(), access_key);
  79. params.insert("secret_key".to_string(), secret_key);
  80. params.insert("pass_key".to_string(), pass_key);
  81. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  82. }
  83. ExchangeEnum::OkxSwap => {
  84. let mut params: BTreeMap<String, String> = BTreeMap::new();
  85. let access_key = account_info.okx_access_key;
  86. let secret_key = account_info.okx_secret_key;
  87. let pass_key = account_info.okx_pass;
  88. params.insert("access_key".to_string(), access_key);
  89. params.insert("secret_key".to_string(), secret_key);
  90. params.insert("pass_key".to_string(), pass_key);
  91. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  92. }
  93. ExchangeEnum::BitgetSpot => {
  94. let mut params: BTreeMap<String, String> = BTreeMap::new();
  95. let access_key = account_info.bitget_access_key;
  96. let secret_key = account_info.bitget_secret_key;
  97. let pass_key = account_info.bitget_pass;
  98. params.insert("access_key".to_string(), access_key);
  99. params.insert("secret_key".to_string(), secret_key);
  100. params.insert("pass_key".to_string(), pass_key);
  101. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  102. }
  103. }
  104. }
  105. #[allow(dead_code)]
  106. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<KucoinSubscribeType>: From<T> {
  107. utils::proxy_handle();
  108. match exchange {
  109. ExchangeEnum::BinanceSpot => {
  110. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  111. // trace!(symbol_format);
  112. // let name = format!("binance_usdt_swap@{}", symbol.to_string().to_lowercase());
  113. // let bool_v1 = Arc::new(AtomicBool::new(true));
  114. //
  115. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  116. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  117. // let access_key = env::var("binance_access_key").unwrap_or("".to_string());
  118. // let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
  119. // params.insert("access_key".to_string(), access_key);
  120. // params.insert("secret_key".to_string(), secret_key);
  121. // let mut exchange_wss;
  122. // exchange_wss = BinanceSpotWs::new_label(name, false, params, BinanceSpotWsType::PublicAndPrivate, res_sender);
  123. // exchange_wss.set_subscribe(subscriber_type.into());
  124. //
  125. // let t1 = tokio::spawn(async move {
  126. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  127. // });
  128. // let mold_arc = Arc::new(mold.to_string());
  129. // let t2 = tokio::spawn(async move {
  130. // let mold_clone = Arc::clone(&mold_arc);
  131. // loop {
  132. // tokio::time::sleep(Duration::from_millis(1)).await;
  133. // if let Ok(received) = res_receiver.try_recv() {
  134. // match mold_clone.as_str() {
  135. // "depth" => {
  136. // if received.data != "" {
  137. // let result = binance_handle::handle_special_depth(received);
  138. // trace!(?result)
  139. // }
  140. // }
  141. // "ticker" => {
  142. // if received.data != "" {
  143. // let result = binance_handle::handle_special_ticker(received);
  144. // trace!(?result)
  145. // }
  146. // }
  147. // _ => {
  148. // error!("没有该命令!mode={}", mold_clone);
  149. // panic!("没有该命令!mode={}", mold_clone)
  150. // }
  151. // }
  152. // }
  153. // }
  154. // });
  155. // try_join!(t1, t2).unwrap();
  156. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  157. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  158. }
  159. ExchangeEnum::BinanceSwap => {
  160. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  161. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  162. }
  163. ExchangeEnum::GateSwap => {
  164. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  165. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  166. }
  167. ExchangeEnum::KucoinSwap => {
  168. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  169. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  170. // let name = format!("kucoin_usdt_swap@{}", symbol.to_string().to_lowercase());
  171. // let bool_v1 = Arc::new(AtomicBool::new(true));
  172. //
  173. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  174. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  175. // let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  176. // let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  177. // let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  178. // params.insert("access_key".to_string(), access_key);
  179. // params.insert("secret_key".to_string(), secret_key);
  180. // params.insert("pass_key".to_string(), pass_key);
  181. // let mut exchange_wss;
  182. // if ["depth", "ticker"].contains(&mold) {
  183. // exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
  184. // } else {
  185. // exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
  186. // }
  187. // exchange_wss.set_subscribe(subscriber_type.into());
  188. //
  189. // let t1 = tokio::spawn(async move {
  190. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  191. // });
  192. // let mold_arc = Arc::new(mold.to_string());
  193. // let t2 = tokio::spawn(async move {
  194. // let mold_clone = Arc::clone(&mold_arc);
  195. // loop {
  196. // tokio::time::sleep(Duration::from_millis(1)).await;
  197. // if let Ok(received) = res_receiver.try_recv() {
  198. // match mold_clone.as_str() {
  199. // "depth" => {
  200. // let result = kucoin_handle::handle_special_depth(received);
  201. // trace!(?result)
  202. // }
  203. // "ticker" => {
  204. // let result = kucoin_handle::handle_special_ticker(received);
  205. // trace!(?result)
  206. // }
  207. // "account" => {
  208. // trace!(?received);
  209. // let result = kucoin_handle::handle_account_info(received, symbol_back.clone());
  210. // trace!(?result)
  211. // }
  212. // "position" => {
  213. // trace!(?received);
  214. // let result = kucoin_handle::handle_position(received, dec!(1));
  215. // trace!(?result)
  216. // }
  217. // "orders" => {
  218. // trace!(?received);
  219. // let result = kucoin_handle::handle_order(received, dec!(0.001));
  220. // trace!(?result)
  221. // }
  222. // _ => {
  223. // error!("没有该命令!mode={}", mold_clone);
  224. // panic!("没有该命令!mode={}", mold_clone)
  225. // }
  226. // }
  227. // }
  228. // }
  229. // });
  230. // try_join!(t1, t2).unwrap();
  231. }
  232. ExchangeEnum::KucoinSpot => {
  233. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  234. let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  235. trace!(symbol_format);
  236. let name = format!("kucoin_usdt_spot@{}", symbol.to_string().to_lowercase());
  237. let bool_v1 = Arc::new(AtomicBool::new(true));
  238. let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  239. let mut params: BTreeMap<String, String> = BTreeMap::new();
  240. let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  241. let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  242. let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  243. params.insert("access_key".to_string(), access_key);
  244. params.insert("secret_key".to_string(), secret_key);
  245. params.insert("pass_key".to_string(), pass_key);
  246. let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  247. KucoinSpotWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
  248. } else {
  249. KucoinSpotWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
  250. };
  251. exchange_wss.set_subscribe(subscriber_type.into());
  252. let t1 = tokio::spawn(async move {
  253. exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  254. });
  255. let mold_arc = Arc::new(mold.to_string());
  256. let t2 = tokio::spawn(async move {
  257. let mold_clone = Arc::clone(&mold_arc);
  258. loop {
  259. tokio::time::sleep(Duration::from_millis(1)).await;
  260. if let Ok(received) = res_receiver.try_recv() {
  261. trace!(?received);
  262. match mold_clone.as_str() {
  263. "depth" => {
  264. if received.data != "" {
  265. let result = kucoin_spot_handle::handle_special_depth(received);
  266. trace!(?result)
  267. }
  268. }
  269. "ticker" => {
  270. if received.data != "" {
  271. let result = kucoin_spot_handle::handle_special_ticker(received);
  272. trace!(?result)
  273. }
  274. }
  275. "account" => {
  276. if received.data != "" {
  277. let result = kucoin_spot_handle::handle_account_info(received, symbol_back.clone());
  278. trace!(?result)
  279. }
  280. }
  281. "orders" => {
  282. if received.data != "" {
  283. let result = kucoin_spot_handle::handle_order(received, dec!(1));
  284. trace!(?result)
  285. }
  286. }
  287. _ => {
  288. error!("没有该命令!mode={}", mold_clone);
  289. panic!("没有该命令!mode={}", mold_clone)
  290. }
  291. }
  292. }
  293. }
  294. });
  295. try_join!(t1, t2).unwrap();
  296. }
  297. ExchangeEnum::BitgetSpot => {
  298. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  299. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  300. // trace!(symbol_format);
  301. // let name = format!("bitget_usdt_spot@{}", symbol.to_string().to_lowercase());
  302. // let bool_v1 = Arc::new(AtomicBool::new(true));
  303. //
  304. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  305. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  306. // let access_key = env::var("bitget_access_key").unwrap_or("".to_string());
  307. // let secret_key = env::var("bitget_secret_key").unwrap_or("".to_string());
  308. // let pass_key = env::var("bitget_pass_key").unwrap_or("".to_string());
  309. // params.insert("access_key".to_string(), access_key);
  310. // params.insert("secret_key".to_string(), secret_key);
  311. // params.insert("pass_key".to_string(), pass_key);
  312. //
  313. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  314. // BitgetSpotWs::new_label(name, false, params, BitgetWsType::Public, res_sender)
  315. // } else {
  316. // BitgetSpotWs::new_label(name, false, params, BitgetWsType::Private, res_sender)
  317. // };
  318. //
  319. // exchange_wss.set_subscribe(subscriber_type.into());
  320. //
  321. // let t1 = tokio::spawn(async move {
  322. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  323. // });
  324. // let mold_arc = Arc::new(mold.to_string());
  325. // let t2 = tokio::spawn(async move {
  326. // let mold_clone = Arc::clone(&mold_arc);
  327. // loop {
  328. // tokio::time::sleep(Duration::from_millis(1)).await;
  329. // if let Ok(received) = res_receiver.try_recv() {
  330. // match mold_clone.as_str() {
  331. // "depth" => {
  332. // if received.data != "" {
  333. // let result = bitget_spot_handle::handle_special_depth(received);
  334. // trace!(?result)
  335. // }
  336. // }
  337. // "ticker" => {
  338. // if received.data != "" {
  339. // let result = bitget_spot_handle::handle_special_ticker(received);
  340. // trace!(?result)
  341. // }
  342. // }
  343. // "account" => {
  344. // if received.data != "" {
  345. // let result = bitget_spot_handle::handle_account_info(received, symbol_back.clone());
  346. // trace!(?result)
  347. // }
  348. // }
  349. // "orders" => {
  350. // if received.data != "" {
  351. // let result = bitget_spot_handle::handle_order(received, dec!(1));
  352. // trace!(?result)
  353. // }
  354. // }
  355. // _ => {
  356. // error!("没有该命令!mode={}", mold_clone);
  357. // panic!("没有该命令!mode={}", mold_clone)
  358. // }
  359. // }
  360. // }
  361. // }
  362. // });
  363. // try_join!(t1, t2).unwrap();
  364. }
  365. ExchangeEnum::OkxSwap => {
  366. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  367. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  368. // trace!(symbol_format);
  369. // let name = format!("okx_usdt_swap@{}", symbol.to_string().to_lowercase());
  370. // let bool_v1 = Arc::new(AtomicBool::new(true));
  371. //
  372. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  373. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  374. // let access_key = env::var("okx_access_key").unwrap_or("".to_string());
  375. // let secret_key = env::var("okx_secret_key").unwrap_or("".to_string());
  376. // let passphrase = env::var("okx_passphrase").unwrap_or("".to_string());
  377. // params.insert("access_key".to_string(), access_key);
  378. // params.insert("secret_key".to_string(), secret_key);
  379. // params.insert("pass_key".to_string(), passphrase);
  380. //
  381. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  382. // OkxSwapWs::new_label(name, false, params, OkxWsType::Public, res_sender)
  383. // } else if ["account", "orders", "position"].contains(&mold) {
  384. // OkxSwapWs::new_label(name, false, params, OkxWsType::Private, res_sender)
  385. // } else {
  386. // OkxSwapWs::new_label(name, false, params, OkxWsType::Business, res_sender)
  387. // };
  388. //
  389. // exchange_wss.set_subscribe(subscriber_type.into());
  390. //
  391. // let t1 = tokio::spawn(async move {
  392. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  393. // });
  394. // let mold_arc = Arc::new(mold.to_string());
  395. // let t2 = tokio::spawn(async move {
  396. // let mold_clone = Arc::clone(&mold_arc);
  397. // loop {
  398. // tokio::time::sleep(Duration::from_millis(1)).await;
  399. // if let Ok(received) = res_receiver.try_recv() {
  400. // match mold_clone.as_str() {
  401. // "depth" => {
  402. // if received.data != "" {
  403. // let result = okx_handle::handle_special_depth(received);
  404. // trace!(?result)
  405. // }
  406. // }
  407. // "ticker" => {
  408. // if received.data != "" {
  409. // let result = okx_handle::handle_special_ticker(received);
  410. // trace!(?result)
  411. // }
  412. // }
  413. // "account" => {
  414. // if received.data != "" {
  415. // let result = okx_handle::handle_account_info(received, symbol_back.clone());
  416. // trace!(?result)
  417. // }
  418. // }
  419. // "position" => {
  420. // if received.data != "" {
  421. // let result = okx_handle::handle_position(received, dec!(10));
  422. // trace!(?result)
  423. // }
  424. // }
  425. // "orders" => {
  426. // if received.data != "" {
  427. // let result = okx_handle::handle_order(received, dec!(10));
  428. // trace!(?result)
  429. // }
  430. // }
  431. // _ => {
  432. // error!("没有该命令!mode={}", mold_clone);
  433. // panic!("没有该命令!mode={}", mold_clone)
  434. // }
  435. // }
  436. // }
  437. // }
  438. // });
  439. // try_join!(t1, t2).unwrap();
  440. }
  441. _ => {
  442. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  443. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  444. }
  445. }
  446. }