exchange_test.rs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. use std::collections::{BTreeMap};
  2. use std::{env};
  3. use std::fs::File;
  4. use toml::from_str;
  5. use std::io::{Error, Read};
  6. use std::sync::Arc;
  7. use std::sync::atomic::AtomicBool;
  8. use std::time::Duration;
  9. use rust_decimal_macros::dec;
  10. use serde::Deserialize;
  11. use tokio::sync::mpsc::{channel, Receiver, Sender};
  12. use tokio::try_join;
  13. use tracing::{error, trace};
  14. // use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
  15. use exchanges::kucoin_spot_ws::{KucoinSpotWs, KucoinSubscribeType, KucoinWsType};
  16. // use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  17. // use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
  18. // use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
  19. use exchanges::response_base::ResponseData;
  20. use standard::exchange::{Exchange, ExchangeEnum};
  21. // use standard::{binance_handle, Order, Platform, utils};
  22. // use standard::{okx_handle, Order, Platform, utils};
  23. // use standard::{kucoin_handle, Order, Platform, utils};
  24. use standard::{kucoin_spot_handle, Order, Platform, utils};
  25. // use standard::{bitget_spot_handle, Order, Platform, utils};
  26. // 创建实体
  27. #[allow(dead_code)]
  28. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  29. utils::proxy_handle();
  30. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  31. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  32. let account_info = global::account_info::get_account_info("../test_account.toml");
  33. match exchange {
  34. ExchangeEnum::BinanceSwap => {
  35. let mut params: BTreeMap<String, String> = BTreeMap::new();
  36. let access_key = account_info.binance_access_key;
  37. let secret_key = account_info.binance_secret_key;
  38. params.insert("access_key".to_string(), access_key);
  39. params.insert("secret_key".to_string(), secret_key);
  40. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  41. }
  42. ExchangeEnum::BinanceSpot => {
  43. let mut params: BTreeMap<String, String> = BTreeMap::new();
  44. let access_key = account_info.binance_access_key;
  45. let secret_key = account_info.binance_secret_key;
  46. params.insert("access_key".to_string(), access_key);
  47. params.insert("secret_key".to_string(), secret_key);
  48. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  49. }
  50. ExchangeEnum::GateSwap => {
  51. let mut params: BTreeMap<String, String> = BTreeMap::new();
  52. let access_key = account_info.gate_access_key;
  53. let secret_key = account_info.gate_secret_key;
  54. params.insert("access_key".to_string(), access_key);
  55. params.insert("secret_key".to_string(), secret_key);
  56. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  57. }
  58. ExchangeEnum::GateSpot => {
  59. let mut params: BTreeMap<String, String> = BTreeMap::new();
  60. let access_key = account_info.gate_access_key;
  61. let secret_key = account_info.gate_secret_key;
  62. params.insert("access_key".to_string(), access_key);
  63. params.insert("secret_key".to_string(), secret_key);
  64. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  65. }
  66. ExchangeEnum::KucoinSwap => {
  67. let mut params: BTreeMap<String, String> = BTreeMap::new();
  68. let access_key = account_info.kucoin_access_key;
  69. let secret_key = account_info.kucoin_secret_key;
  70. let pass_key = account_info.kucoin_pass;
  71. params.insert("access_key".to_string(), access_key);
  72. params.insert("secret_key".to_string(), secret_key);
  73. params.insert("pass_key".to_string(), pass_key);
  74. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  75. }
  76. ExchangeEnum::KucoinSpot => {
  77. let mut params: BTreeMap<String, String> = BTreeMap::new();
  78. let access_key = account_info.kucoin_access_key;
  79. let secret_key = account_info.kucoin_secret_key;
  80. let pass_key = account_info.kucoin_pass;
  81. params.insert("access_key".to_string(), access_key);
  82. params.insert("secret_key".to_string(), secret_key);
  83. params.insert("pass_key".to_string(), pass_key);
  84. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  85. }
  86. ExchangeEnum::OkxSwap => {
  87. let mut params: BTreeMap<String, String> = BTreeMap::new();
  88. let access_key = account_info.okx_access_key;
  89. let secret_key = account_info.okx_secret_key;
  90. let pass_key = account_info.okx_pass;
  91. params.insert("access_key".to_string(), access_key);
  92. params.insert("secret_key".to_string(), secret_key);
  93. params.insert("pass_key".to_string(), pass_key);
  94. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  95. }
  96. ExchangeEnum::BitgetSpot => {
  97. let mut params: BTreeMap<String, String> = BTreeMap::new();
  98. let access_key = account_info.bitget_access_key;
  99. let secret_key = account_info.bitget_secret_key;
  100. let pass_key = account_info.bitget_pass;
  101. params.insert("access_key".to_string(), access_key);
  102. params.insert("secret_key".to_string(), secret_key);
  103. params.insert("pass_key".to_string(), pass_key);
  104. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  105. }
  106. }
  107. }
  108. #[allow(dead_code)]
  109. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<KucoinSubscribeType>: From<T> {
  110. utils::proxy_handle();
  111. match exchange {
  112. ExchangeEnum::BinanceSpot => {
  113. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  114. // trace!(symbol_format);
  115. // let name = format!("binance_usdt_swap@{}", symbol.to_string().to_lowercase());
  116. // let bool_v1 = Arc::new(AtomicBool::new(true));
  117. //
  118. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  119. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  120. // let access_key = env::var("binance_access_key").unwrap_or("".to_string());
  121. // let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
  122. // params.insert("access_key".to_string(), access_key);
  123. // params.insert("secret_key".to_string(), secret_key);
  124. // let mut exchange_wss;
  125. // exchange_wss = BinanceSpotWs::new_label(name, false, params, BinanceSpotWsType::PublicAndPrivate, res_sender);
  126. // exchange_wss.set_subscribe(subscriber_type.into());
  127. //
  128. // let t1 = tokio::spawn(async move {
  129. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  130. // });
  131. // let mold_arc = Arc::new(mold.to_string());
  132. // let t2 = tokio::spawn(async move {
  133. // let mold_clone = Arc::clone(&mold_arc);
  134. // loop {
  135. // tokio::time::sleep(Duration::from_millis(1)).await;
  136. // if let Ok(received) = res_receiver.try_recv() {
  137. // match mold_clone.as_str() {
  138. // "depth" => {
  139. // if received.data != "" {
  140. // let result = binance_handle::handle_special_depth(received);
  141. // trace!(?result)
  142. // }
  143. // }
  144. // "ticker" => {
  145. // if received.data != "" {
  146. // let result = binance_handle::handle_special_ticker(received);
  147. // trace!(?result)
  148. // }
  149. // }
  150. // _ => {
  151. // error!("没有该命令!mode={}", mold_clone);
  152. // panic!("没有该命令!mode={}", mold_clone)
  153. // }
  154. // }
  155. // }
  156. // }
  157. // });
  158. // try_join!(t1, t2).unwrap();
  159. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  160. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  161. }
  162. ExchangeEnum::BinanceSwap => {
  163. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  164. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  165. }
  166. ExchangeEnum::GateSwap => {
  167. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  168. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  169. }
  170. ExchangeEnum::KucoinSwap => {
  171. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  172. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  173. // let name = format!("kucoin_usdt_swap@{}", symbol.to_string().to_lowercase());
  174. // let bool_v1 = Arc::new(AtomicBool::new(true));
  175. //
  176. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  177. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  178. // let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  179. // let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  180. // let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  181. // params.insert("access_key".to_string(), access_key);
  182. // params.insert("secret_key".to_string(), secret_key);
  183. // params.insert("pass_key".to_string(), pass_key);
  184. // let mut exchange_wss;
  185. // if ["depth", "ticker"].contains(&mold) {
  186. // exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
  187. // } else {
  188. // exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
  189. // }
  190. // exchange_wss.set_subscribe(subscriber_type.into());
  191. //
  192. // let t1 = tokio::spawn(async move {
  193. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  194. // });
  195. // let mold_arc = Arc::new(mold.to_string());
  196. // let t2 = tokio::spawn(async move {
  197. // let mold_clone = Arc::clone(&mold_arc);
  198. // loop {
  199. // tokio::time::sleep(Duration::from_millis(1)).await;
  200. // if let Ok(received) = res_receiver.try_recv() {
  201. // match mold_clone.as_str() {
  202. // "depth" => {
  203. // let result = kucoin_handle::handle_special_depth(received);
  204. // trace!(?result)
  205. // }
  206. // "ticker" => {
  207. // let result = kucoin_handle::handle_special_ticker(received);
  208. // trace!(?result)
  209. // }
  210. // "account" => {
  211. // trace!(?received);
  212. // let result = kucoin_handle::handle_account_info(received, symbol_back.clone());
  213. // trace!(?result)
  214. // }
  215. // "position" => {
  216. // trace!(?received);
  217. // let result = kucoin_handle::handle_position(received, dec!(1));
  218. // trace!(?result)
  219. // }
  220. // "orders" => {
  221. // trace!(?received);
  222. // let result = kucoin_handle::handle_order(received, dec!(0.001));
  223. // trace!(?result)
  224. // }
  225. // _ => {
  226. // error!("没有该命令!mode={}", mold_clone);
  227. // panic!("没有该命令!mode={}", mold_clone)
  228. // }
  229. // }
  230. // }
  231. // }
  232. // });
  233. // try_join!(t1, t2).unwrap();
  234. }
  235. ExchangeEnum::KucoinSpot => {
  236. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  237. let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  238. trace!(symbol_format);
  239. let name = format!("kucoin_usdt_spot@{}", symbol.to_string().to_lowercase());
  240. let bool_v1 = Arc::new(AtomicBool::new(true));
  241. let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  242. let mut params: BTreeMap<String, String> = BTreeMap::new();
  243. let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  244. let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  245. let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  246. params.insert("access_key".to_string(), access_key);
  247. params.insert("secret_key".to_string(), secret_key);
  248. params.insert("pass_key".to_string(), pass_key);
  249. let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  250. KucoinSpotWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
  251. } else {
  252. KucoinSpotWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
  253. };
  254. exchange_wss.set_subscribe(subscriber_type.into());
  255. let t1 = tokio::spawn(async move {
  256. exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  257. });
  258. let mold_arc = Arc::new(mold.to_string());
  259. let t2 = tokio::spawn(async move {
  260. let mold_clone = Arc::clone(&mold_arc);
  261. loop {
  262. tokio::time::sleep(Duration::from_millis(1)).await;
  263. if let Ok(received) = res_receiver.try_recv() {
  264. trace!(?received);
  265. match mold_clone.as_str() {
  266. "depth" => {
  267. if received.data != "" {
  268. let result = kucoin_spot_handle::handle_special_depth(received);
  269. trace!(?result)
  270. }
  271. }
  272. "ticker" => {
  273. if received.data != "" {
  274. let result = kucoin_spot_handle::handle_special_ticker(received);
  275. trace!(?result)
  276. }
  277. }
  278. "account" => {
  279. if received.data != "" {
  280. let result = kucoin_spot_handle::handle_account_info(received, symbol_back.clone());
  281. trace!(?result)
  282. }
  283. }
  284. "orders" => {
  285. if received.data != "" {
  286. let result = kucoin_spot_handle::handle_order(received, dec!(1));
  287. trace!(?result)
  288. }
  289. }
  290. _ => {
  291. error!("没有该命令!mode={}", mold_clone);
  292. panic!("没有该命令!mode={}", mold_clone)
  293. }
  294. }
  295. }
  296. }
  297. });
  298. try_join!(t1, t2).unwrap();
  299. }
  300. ExchangeEnum::BitgetSpot => {
  301. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  302. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  303. // trace!(symbol_format);
  304. // let name = format!("bitget_usdt_spot@{}", symbol.to_string().to_lowercase());
  305. // let bool_v1 = Arc::new(AtomicBool::new(true));
  306. //
  307. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  308. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  309. // let access_key = env::var("bitget_access_key").unwrap_or("".to_string());
  310. // let secret_key = env::var("bitget_secret_key").unwrap_or("".to_string());
  311. // let pass_key = env::var("bitget_pass_key").unwrap_or("".to_string());
  312. // params.insert("access_key".to_string(), access_key);
  313. // params.insert("secret_key".to_string(), secret_key);
  314. // params.insert("pass_key".to_string(), pass_key);
  315. //
  316. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  317. // BitgetSpotWs::new_label(name, false, params, BitgetWsType::Public, res_sender)
  318. // } else {
  319. // BitgetSpotWs::new_label(name, false, params, BitgetWsType::Private, res_sender)
  320. // };
  321. //
  322. // exchange_wss.set_subscribe(subscriber_type.into());
  323. //
  324. // let t1 = tokio::spawn(async move {
  325. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  326. // });
  327. // let mold_arc = Arc::new(mold.to_string());
  328. // let t2 = tokio::spawn(async move {
  329. // let mold_clone = Arc::clone(&mold_arc);
  330. // loop {
  331. // tokio::time::sleep(Duration::from_millis(1)).await;
  332. // if let Ok(received) = res_receiver.try_recv() {
  333. // match mold_clone.as_str() {
  334. // "depth" => {
  335. // if received.data != "" {
  336. // let result = bitget_spot_handle::handle_special_depth(received);
  337. // trace!(?result)
  338. // }
  339. // }
  340. // "ticker" => {
  341. // if received.data != "" {
  342. // let result = bitget_spot_handle::handle_special_ticker(received);
  343. // trace!(?result)
  344. // }
  345. // }
  346. // "account" => {
  347. // if received.data != "" {
  348. // let result = bitget_spot_handle::handle_account_info(received, symbol_back.clone());
  349. // trace!(?result)
  350. // }
  351. // }
  352. // "orders" => {
  353. // if received.data != "" {
  354. // let result = bitget_spot_handle::handle_order(received, dec!(1));
  355. // trace!(?result)
  356. // }
  357. // }
  358. // _ => {
  359. // error!("没有该命令!mode={}", mold_clone);
  360. // panic!("没有该命令!mode={}", mold_clone)
  361. // }
  362. // }
  363. // }
  364. // }
  365. // });
  366. // try_join!(t1, t2).unwrap();
  367. }
  368. ExchangeEnum::OkxSwap => {
  369. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  370. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  371. // trace!(symbol_format);
  372. // let name = format!("okx_usdt_swap@{}", symbol.to_string().to_lowercase());
  373. // let bool_v1 = Arc::new(AtomicBool::new(true));
  374. //
  375. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  376. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  377. // let access_key = env::var("okx_access_key").unwrap_or("".to_string());
  378. // let secret_key = env::var("okx_secret_key").unwrap_or("".to_string());
  379. // let passphrase = env::var("okx_passphrase").unwrap_or("".to_string());
  380. // params.insert("access_key".to_string(), access_key);
  381. // params.insert("secret_key".to_string(), secret_key);
  382. // params.insert("pass_key".to_string(), passphrase);
  383. //
  384. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  385. // OkxSwapWs::new_label(name, false, params, OkxWsType::Public, res_sender)
  386. // } else if ["account", "orders", "position"].contains(&mold) {
  387. // OkxSwapWs::new_label(name, false, params, OkxWsType::Private, res_sender)
  388. // } else {
  389. // OkxSwapWs::new_label(name, false, params, OkxWsType::Business, res_sender)
  390. // };
  391. //
  392. // exchange_wss.set_subscribe(subscriber_type.into());
  393. //
  394. // let t1 = tokio::spawn(async move {
  395. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  396. // });
  397. // let mold_arc = Arc::new(mold.to_string());
  398. // let t2 = tokio::spawn(async move {
  399. // let mold_clone = Arc::clone(&mold_arc);
  400. // loop {
  401. // tokio::time::sleep(Duration::from_millis(1)).await;
  402. // if let Ok(received) = res_receiver.try_recv() {
  403. // match mold_clone.as_str() {
  404. // "depth" => {
  405. // if received.data != "" {
  406. // let result = okx_handle::handle_special_depth(received);
  407. // trace!(?result)
  408. // }
  409. // }
  410. // "ticker" => {
  411. // if received.data != "" {
  412. // let result = okx_handle::handle_special_ticker(received);
  413. // trace!(?result)
  414. // }
  415. // }
  416. // "account" => {
  417. // if received.data != "" {
  418. // let result = okx_handle::handle_account_info(received, symbol_back.clone());
  419. // trace!(?result)
  420. // }
  421. // }
  422. // "position" => {
  423. // if received.data != "" {
  424. // let result = okx_handle::handle_position(received, dec!(10));
  425. // trace!(?result)
  426. // }
  427. // }
  428. // "orders" => {
  429. // if received.data != "" {
  430. // let result = okx_handle::handle_order(received, dec!(10));
  431. // trace!(?result)
  432. // }
  433. // }
  434. // _ => {
  435. // error!("没有该命令!mode={}", mold_clone);
  436. // panic!("没有该命令!mode={}", mold_clone)
  437. // }
  438. // }
  439. // }
  440. // }
  441. // });
  442. // try_join!(t1, t2).unwrap();
  443. }
  444. _ => {
  445. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  446. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  447. }
  448. }
  449. }