exchange_test.rs 25 KB


  1. use std::collections::{BTreeMap};
  2. use std::{env};
  3. use std::io::Error;
  4. use std::sync::Arc;
  5. use std::sync::atomic::AtomicBool;
  6. use std::time::Duration;
  7. use rust_decimal_macros::dec;
  8. use tokio::sync::mpsc::{channel, Receiver, Sender};
  9. use tokio::try_join;
  10. use tracing::{error, trace};
  11. // use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
  12. use exchanges::kucoin_spot_ws::{KucoinSpotWs, KucoinSubscribeType, KucoinWsType};
  13. // use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
  14. // use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
  15. // use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
  16. use exchanges::response_base::ResponseData;
  17. use standard::exchange::{Exchange, ExchangeEnum};
  18. // use standard::{binance_handle, Order, Platform, utils};
  19. // use standard::{okx_handle, Order, Platform, utils};
  20. // use standard::{kucoin_handle, Order, Platform, utils};
  21. use standard::{kucoin_spot_handle, Order, Platform, utils};
  22. // use standard::{bitget_spot_handle, Order, Platform, utils};
  23. // 创建实体
  24. #[allow(dead_code)]
  25. pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn Platform> {
  26. utils::proxy_handle();
  27. let (order_sender, _order_receiver): (Sender<Order>, Receiver<Order>) = channel(1024);
  28. let (error_sender, _error_receiver): (Sender<Error>, Receiver<Error>) = channel(1024);
  29. match exchange {
  30. ExchangeEnum::BinanceSwap => {
  31. let mut params: BTreeMap<String, String> = BTreeMap::new();
  32. let access_key = env::var("binance_access_key").unwrap_or("".to_string());
  33. let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
  34. params.insert("access_key".to_string(), access_key);
  35. params.insert("secret_key".to_string(), secret_key);
  36. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  37. }
  38. ExchangeEnum::BinanceSpot => {
  39. let mut params: BTreeMap<String, String> = BTreeMap::new();
  40. let access_key = env::var("binance_access_key").unwrap_or("".to_string());
  41. let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
  42. params.insert("access_key".to_string(), access_key);
  43. params.insert("secret_key".to_string(), secret_key);
  44. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  45. }
  46. ExchangeEnum::GateSwap => {
  47. let mut params: BTreeMap<String, String> = BTreeMap::new();
  48. let access_key = env::var("gate_access_key").unwrap_or("".to_string());
  49. let secret_key = env::var("gate_secret_key").unwrap_or("".to_string());
  50. params.insert("access_key".to_string(), access_key);
  51. params.insert("secret_key".to_string(), secret_key);
  52. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  53. }
  54. ExchangeEnum::GateSpot => {
  55. let mut params: BTreeMap<String, String> = BTreeMap::new();
  56. let access_key = env::var("gate_access_key").unwrap_or("".to_string());
  57. let secret_key = env::var("gate_secret_key").unwrap_or("".to_string());
  58. params.insert("access_key".to_string(), access_key);
  59. params.insert("secret_key".to_string(), secret_key);
  60. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  61. }
  62. ExchangeEnum::KucoinSwap => {
  63. let mut params: BTreeMap<String, String> = BTreeMap::new();
  64. let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  65. let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  66. let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  67. params.insert("access_key".to_string(), access_key);
  68. params.insert("secret_key".to_string(), secret_key);
  69. params.insert("pass_key".to_string(), pass_key);
  70. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  71. }
  72. ExchangeEnum::KucoinSpot => {
  73. let mut params: BTreeMap<String, String> = BTreeMap::new();
  74. let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  75. let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  76. let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  77. params.insert("access_key".to_string(), access_key);
  78. params.insert("secret_key".to_string(), secret_key);
  79. params.insert("pass_key".to_string(), pass_key);
  80. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  81. }
  82. ExchangeEnum::OkxSwap => {
  83. let mut params: BTreeMap<String, String> = BTreeMap::new();
  84. let access_key = env::var("okx_access_key").unwrap_or("".to_string());
  85. let secret_key = env::var("okx_secret_key").unwrap_or("".to_string());
  86. let pass_key = env::var("okx_passphrase").unwrap_or("".to_string());
  87. params.insert("access_key".to_string(), access_key);
  88. params.insert("secret_key".to_string(), secret_key);
  89. params.insert("pass_key".to_string(), pass_key);
  90. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  91. }
  92. ExchangeEnum::BitgetSpot => {
  93. let mut params: BTreeMap<String, String> = BTreeMap::new();
  94. let access_key = env::var("bitget_access_key").unwrap_or("".to_string());
  95. let secret_key = env::var("bitget_secret_key").unwrap_or("".to_string());
  96. let pass_key = env::var("bitget_pass_key").unwrap_or("".to_string());
  97. params.insert("access_key".to_string(), access_key);
  98. params.insert("secret_key".to_string(), secret_key);
  99. params.insert("pass_key".to_string(), pass_key);
  100. Exchange::new(exchange, symbol.to_string(), false, params, order_sender, error_sender).await
  101. }
  102. }
  103. }
  104. #[allow(dead_code)]
  105. pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<KucoinSubscribeType>: From<T> {
  106. utils::proxy_handle();
  107. match exchange {
  108. ExchangeEnum::BinanceSpot => {
  109. // let symbol_format = utils::format_symbol(symbol.to_string(), "").to_uppercase();
  110. // trace!(symbol_format);
  111. // let name = format!("binance_usdt_swap@{}", symbol.to_string().to_lowercase());
  112. // let bool_v1 = Arc::new(AtomicBool::new(true));
  113. //
  114. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  115. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  116. // let access_key = env::var("binance_access_key").unwrap_or("".to_string());
  117. // let secret_key = env::var("binance_secret_key").unwrap_or("".to_string());
  118. // params.insert("access_key".to_string(), access_key);
  119. // params.insert("secret_key".to_string(), secret_key);
  120. // let mut exchange_wss;
  121. // exchange_wss = BinanceSpotWs::new_label(name, false, params, BinanceSpotWsType::PublicAndPrivate, res_sender);
  122. // exchange_wss.set_subscribe(subscriber_type.into());
  123. //
  124. // let t1 = tokio::spawn(async move {
  125. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  126. // });
  127. // let mold_arc = Arc::new(mold.to_string());
  128. // let t2 = tokio::spawn(async move {
  129. // let mold_clone = Arc::clone(&mold_arc);
  130. // loop {
  131. // tokio::time::sleep(Duration::from_millis(1)).await;
  132. // if let Ok(received) = res_receiver.try_recv() {
  133. // match mold_clone.as_str() {
  134. // "depth" => {
  135. // if received.data != "" {
  136. // let result = binance_handle::handle_special_depth(received);
  137. // trace!(?result)
  138. // }
  139. // }
  140. // "ticker" => {
  141. // if received.data != "" {
  142. // let result = binance_handle::handle_special_ticker(received);
  143. // trace!(?result)
  144. // }
  145. // }
  146. // _ => {
  147. // error!("没有该命令!mode={}", mold_clone);
  148. // panic!("没有该命令!mode={}", mold_clone)
  149. // }
  150. // }
  151. // }
  152. // }
  153. // });
  154. // try_join!(t1, t2).unwrap();
  155. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  156. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  157. }
  158. ExchangeEnum::BinanceSwap => {
  159. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  160. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  161. }
  162. ExchangeEnum::GateSwap => {
  163. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  164. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  165. }
  166. ExchangeEnum::KucoinSwap => {
  167. // let symbol_format = format!("{}M", utils::format_symbol(symbol.to_string(), ""));
  168. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  169. // let name = format!("kucoin_usdt_swap@{}", symbol.to_string().to_lowercase());
  170. // let bool_v1 = Arc::new(AtomicBool::new(true));
  171. //
  172. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  173. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  174. // let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  175. // let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  176. // let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  177. // params.insert("access_key".to_string(), access_key);
  178. // params.insert("secret_key".to_string(), secret_key);
  179. // params.insert("pass_key".to_string(), pass_key);
  180. // let mut exchange_wss;
  181. // if ["depth", "ticker"].contains(&mold) {
  182. // exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
  183. // } else {
  184. // exchange_wss = KucoinSwapWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
  185. // }
  186. // exchange_wss.set_subscribe(subscriber_type.into());
  187. //
  188. // let t1 = tokio::spawn(async move {
  189. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  190. // });
  191. // let mold_arc = Arc::new(mold.to_string());
  192. // let t2 = tokio::spawn(async move {
  193. // let mold_clone = Arc::clone(&mold_arc);
  194. // loop {
  195. // tokio::time::sleep(Duration::from_millis(1)).await;
  196. // if let Ok(received) = res_receiver.try_recv() {
  197. // match mold_clone.as_str() {
  198. // "depth" => {
  199. // let result = kucoin_handle::handle_special_depth(received);
  200. // trace!(?result)
  201. // }
  202. // "ticker" => {
  203. // let result = kucoin_handle::handle_special_ticker(received);
  204. // trace!(?result)
  205. // }
  206. // "account" => {
  207. // trace!(?received);
  208. // let result = kucoin_handle::handle_account_info(received, symbol_back.clone());
  209. // trace!(?result)
  210. // }
  211. // "position" => {
  212. // trace!(?received);
  213. // let result = kucoin_handle::handle_position(received, dec!(1));
  214. // trace!(?result)
  215. // }
  216. // "orders" => {
  217. // trace!(?received);
  218. // let result = kucoin_handle::handle_order(received, dec!(0.001));
  219. // trace!(?result)
  220. // }
  221. // _ => {
  222. // error!("没有该命令!mode={}", mold_clone);
  223. // panic!("没有该命令!mode={}", mold_clone)
  224. // }
  225. // }
  226. // }
  227. // }
  228. // });
  229. // try_join!(t1, t2).unwrap();
  230. }
  231. ExchangeEnum::KucoinSpot => {
  232. let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  233. let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  234. trace!(symbol_format);
  235. let name = format!("kucoin_usdt_spot@{}", symbol.to_string().to_lowercase());
  236. let bool_v1 = Arc::new(AtomicBool::new(true));
  237. let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  238. let mut params: BTreeMap<String, String> = BTreeMap::new();
  239. let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
  240. let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
  241. let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
  242. params.insert("access_key".to_string(), access_key);
  243. params.insert("secret_key".to_string(), secret_key);
  244. params.insert("pass_key".to_string(), pass_key);
  245. let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  246. KucoinSpotWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
  247. } else {
  248. KucoinSpotWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
  249. };
  250. exchange_wss.set_subscribe(subscriber_type.into());
  251. let t1 = tokio::spawn(async move {
  252. exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  253. });
  254. let mold_arc = Arc::new(mold.to_string());
  255. let t2 = tokio::spawn(async move {
  256. let mold_clone = Arc::clone(&mold_arc);
  257. loop {
  258. tokio::time::sleep(Duration::from_millis(1)).await;
  259. if let Ok(received) = res_receiver.try_recv() {
  260. trace!(?received);
  261. match mold_clone.as_str() {
  262. "depth" => {
  263. if received.data != "" {
  264. let result = kucoin_spot_handle::handle_special_depth(received);
  265. trace!(?result)
  266. }
  267. }
  268. "ticker" => {
  269. if received.data != "" {
  270. let result = kucoin_spot_handle::handle_special_ticker(received);
  271. trace!(?result)
  272. }
  273. }
  274. "account" => {
  275. if received.data != "" {
  276. let result = kucoin_spot_handle::handle_account_info(received, symbol_back.clone());
  277. trace!(?result)
  278. }
  279. }
  280. "orders" => {
  281. if received.data != "" {
  282. let result = kucoin_spot_handle::handle_order(received, dec!(1));
  283. trace!(?result)
  284. }
  285. }
  286. _ => {
  287. error!("没有该命令!mode={}", mold_clone);
  288. panic!("没有该命令!mode={}", mold_clone)
  289. }
  290. }
  291. }
  292. }
  293. });
  294. try_join!(t1, t2).unwrap();
  295. }
  296. ExchangeEnum::BitgetSpot => {
  297. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  298. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  299. // trace!(symbol_format);
  300. // let name = format!("bitget_usdt_spot@{}", symbol.to_string().to_lowercase());
  301. // let bool_v1 = Arc::new(AtomicBool::new(true));
  302. //
  303. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  304. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  305. // let access_key = env::var("bitget_access_key").unwrap_or("".to_string());
  306. // let secret_key = env::var("bitget_secret_key").unwrap_or("".to_string());
  307. // let pass_key = env::var("bitget_pass_key").unwrap_or("".to_string());
  308. // params.insert("access_key".to_string(), access_key);
  309. // params.insert("secret_key".to_string(), secret_key);
  310. // params.insert("pass_key".to_string(), pass_key);
  311. //
  312. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  313. // BitgetSpotWs::new_label(name, false, params, BitgetWsType::Public, res_sender)
  314. // } else {
  315. // BitgetSpotWs::new_label(name, false, params, BitgetWsType::Private, res_sender)
  316. // };
  317. //
  318. // exchange_wss.set_subscribe(subscriber_type.into());
  319. //
  320. // let t1 = tokio::spawn(async move {
  321. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  322. // });
  323. // let mold_arc = Arc::new(mold.to_string());
  324. // let t2 = tokio::spawn(async move {
  325. // let mold_clone = Arc::clone(&mold_arc);
  326. // loop {
  327. // tokio::time::sleep(Duration::from_millis(1)).await;
  328. // if let Ok(received) = res_receiver.try_recv() {
  329. // match mold_clone.as_str() {
  330. // "depth" => {
  331. // if received.data != "" {
  332. // let result = bitget_spot_handle::handle_special_depth(received);
  333. // trace!(?result)
  334. // }
  335. // }
  336. // "ticker" => {
  337. // if received.data != "" {
  338. // let result = bitget_spot_handle::handle_special_ticker(received);
  339. // trace!(?result)
  340. // }
  341. // }
  342. // "account" => {
  343. // if received.data != "" {
  344. // let result = bitget_spot_handle::handle_account_info(received, symbol_back.clone());
  345. // trace!(?result)
  346. // }
  347. // }
  348. // "orders" => {
  349. // if received.data != "" {
  350. // let result = bitget_spot_handle::handle_order(received, dec!(1));
  351. // trace!(?result)
  352. // }
  353. // }
  354. // _ => {
  355. // error!("没有该命令!mode={}", mold_clone);
  356. // panic!("没有该命令!mode={}", mold_clone)
  357. // }
  358. // }
  359. // }
  360. // }
  361. // });
  362. // try_join!(t1, t2).unwrap();
  363. }
  364. ExchangeEnum::OkxSwap => {
  365. // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
  366. // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
  367. // trace!(symbol_format);
  368. // let name = format!("okx_usdt_swap@{}", symbol.to_string().to_lowercase());
  369. // let bool_v1 = Arc::new(AtomicBool::new(true));
  370. //
  371. // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
  372. // let mut params: BTreeMap<String, String> = BTreeMap::new();
  373. // let access_key = env::var("okx_access_key").unwrap_or("".to_string());
  374. // let secret_key = env::var("okx_secret_key").unwrap_or("".to_string());
  375. // let passphrase = env::var("okx_passphrase").unwrap_or("".to_string());
  376. // params.insert("access_key".to_string(), access_key);
  377. // params.insert("secret_key".to_string(), secret_key);
  378. // params.insert("pass_key".to_string(), passphrase);
  379. //
  380. // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
  381. // OkxSwapWs::new_label(name, false, params, OkxWsType::Public, res_sender)
  382. // } else if ["account", "orders", "position"].contains(&mold) {
  383. // OkxSwapWs::new_label(name, false, params, OkxWsType::Private, res_sender)
  384. // } else {
  385. // OkxSwapWs::new_label(name, false, params, OkxWsType::Business, res_sender)
  386. // };
  387. //
  388. // exchange_wss.set_subscribe(subscriber_type.into());
  389. //
  390. // let t1 = tokio::spawn(async move {
  391. // exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
  392. // });
  393. // let mold_arc = Arc::new(mold.to_string());
  394. // let t2 = tokio::spawn(async move {
  395. // let mold_clone = Arc::clone(&mold_arc);
  396. // loop {
  397. // tokio::time::sleep(Duration::from_millis(1)).await;
  398. // if let Ok(received) = res_receiver.try_recv() {
  399. // match mold_clone.as_str() {
  400. // "depth" => {
  401. // if received.data != "" {
  402. // let result = okx_handle::handle_special_depth(received);
  403. // trace!(?result)
  404. // }
  405. // }
  406. // "ticker" => {
  407. // if received.data != "" {
  408. // let result = okx_handle::handle_special_ticker(received);
  409. // trace!(?result)
  410. // }
  411. // }
  412. // "account" => {
  413. // if received.data != "" {
  414. // let result = okx_handle::handle_account_info(received, symbol_back.clone());
  415. // trace!(?result)
  416. // }
  417. // }
  418. // "position" => {
  419. // if received.data != "" {
  420. // let result = okx_handle::handle_position(received, dec!(10));
  421. // trace!(?result)
  422. // }
  423. // }
  424. // "orders" => {
  425. // if received.data != "" {
  426. // let result = okx_handle::handle_order(received, dec!(10));
  427. // trace!(?result)
  428. // }
  429. // }
  430. // _ => {
  431. // error!("没有该命令!mode={}", mold_clone);
  432. // panic!("没有该命令!mode={}", mold_clone)
  433. // }
  434. // }
  435. // }
  436. // }
  437. // });
  438. // try_join!(t1, t2).unwrap();
  439. }
  440. _ => {
  441. error!("该交易所不支持!test_new_exchange_wss:{:?}",exchange);
  442. panic!("该交易所不支持!test_new_exchange_wss:{:?}", exchange)
  443. }
  444. }
  445. }