kucoin_swap_test_async.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use futures_util::StreamExt;
  4. use tokio::sync::Mutex;
  5. use tracing::trace;
  6. use exchanges::kucoin_swap_ws_async::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  7. const ACCESS_KEY: &str = "";
  8. const SECRET_KEY: &str = "";
  9. const PASS_KEY: &str = "";
  10. //ws-订阅公共频道信息
  11. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  12. async fn ws_custom_subscribe_pu() {
  13. global::log_utils::init_log_with_trace();
  14. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  15. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  16. // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
  17. // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
  18. let mut ws = get_ws(None, KucoinSwapWsType::Public).await;
  19. ws.set_symbols(vec!["xbt_usdtM".to_string()]);
  20. ws.set_subscribe(vec![
  21. KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
  22. KucoinSwapSubscribeType::PuContractMarketExecution,
  23. KucoinSwapSubscribeType::PuContractMarkettickerV2,
  24. ]);
  25. let write_tx_am = Arc::new(Mutex::new(write_tx));
  26. let bool_v1 = Arc::new(AtomicBool::new(true));
  27. //读取
  28. let bool_v1_clone = Arc::clone(&bool_v1);
  29. let tr = tokio::spawn(async move {
  30. trace!("线程-数据读取-开启");
  31. loop {
  32. if let Some(data) = read_rx.next().await {
  33. trace!("读取数据data:{:?}",data)
  34. }
  35. }
  36. trace!("线程-数据读取-结束");
  37. });
  38. //写数据
  39. let bool_v2_clone = Arc::clone(&bool_v1);
  40. let write_tx_clone = Arc::clone(&write_tx_am);
  41. // let tw = tokio::spawn(async move {
  42. // trace!("线程-数据写入-开始");
  43. // loop {
  44. // tokio::time::sleep(Duration::from_millis(20*1000)).await;
  45. // let close_frame = CloseFrame {
  46. // code: CloseCode::Normal,
  47. // reason: Cow::Borrowed("Bye bye"),
  48. // };
  49. // let close_message = Message::Close(Some(close_frame));
  50. // // AbstractWsMode::send_subscribe(write_tx_clone.clone(), Message::Text("32313221".to_string()));
  51. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), close_message);
  52. // trace!("发送指令成功");
  53. // }
  54. // trace!("线程-数据写入-结束");
  55. // });
  56. // loop {
  57. let t1 = tokio::spawn(async move {
  58. //链接
  59. let bool_v3_clone = Arc::clone(&bool_v1);
  60. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  61. trace!("test 唯一线程结束--");
  62. });
  63. tokio::try_join!(t1).unwrap();
  64. trace!("当此结束");
  65. // }
  66. trace!("重启!");
  67. trace!("参考交易所关闭");
  68. return;
  69. }
  70. //ws-订阅私有频道信息
  71. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  72. async fn ws_custom_subscribe_pr() {
  73. global::log_utils::init_log_with_trace();
  74. //对象
  75. let btree_map = KucoinSwapLogin {
  76. access_key: ACCESS_KEY.to_string(),
  77. secret_key: SECRET_KEY.to_string(),
  78. pass_key: PASS_KEY.to_string(),
  79. };
  80. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  81. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  82. // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
  83. // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
  84. let mut ws = get_ws(Option::from(btree_map), KucoinSwapWsType::Private).await;
  85. ws.set_symbols(vec!["xbt_usdtM".to_string()]);
  86. ws.set_subscribe(vec![
  87. // KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
  88. // KucoinSwapSubscribeType::PuContractMarketExecution,
  89. KucoinSwapSubscribeType::PuContractMarkettickerV2,
  90. KucoinSwapSubscribeType::PrContractAccountWallet,
  91. KucoinSwapSubscribeType::PrContractPosition,
  92. KucoinSwapSubscribeType::PrContractMarketTradeOrdersSys,
  93. KucoinSwapSubscribeType::PrContractMarketTradeOrders,
  94. ]);
  95. let write_tx_am = Arc::new(Mutex::new(write_tx));
  96. let bool_v1 = Arc::new(AtomicBool::new(true));
  97. //读取
  98. let bool_v1_clone = Arc::clone(&bool_v1);
  99. let tr = tokio::spawn(async move {
  100. trace!("线程-数据读取-开启");
  101. loop {
  102. if let Some(data) = read_rx.next().await {
  103. trace!("读取数据data:{:?}",data)
  104. }
  105. }
  106. trace!("线程-数据读取-结束");
  107. });
  108. //写数据
  109. let bool_v2_clone = Arc::clone(&bool_v1);
  110. let write_tx_clone = Arc::clone(&write_tx_am);
  111. // let tw = tokio::spawn(async move {
  112. // trace!("线程-数据写入-开始");
  113. // loop {
  114. // tokio::time::sleep(Duration::from_millis(20*1000)).await;
  115. // let close_frame = CloseFrame {
  116. // code: CloseCode::Normal,
  117. // reason: Cow::Borrowed("Bye bye"),
  118. // };
  119. // let close_message = Message::Close(Some(close_frame));
  120. // // AbstractWsMode::send_subscribe(write_tx_clone.clone(), Message::Text("32313221".to_string()));
  121. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), close_message);
  122. // trace!("发送指令成功");
  123. // }
  124. // trace!("线程-数据写入-结束");
  125. // });
  126. // loop {
  127. let t1 = tokio::spawn(async move {
  128. //链接
  129. let bool_v3_clone = Arc::clone(&bool_v1);
  130. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  131. trace!("test 唯一线程结束--");
  132. });
  133. tokio::try_join!(t1).unwrap();
  134. trace!("当此结束");
  135. // }
  136. trace!("重启!");
  137. trace!("参考交易所关闭");
  138. return;
  139. }
  140. async fn get_ws(btree_map: Option<KucoinSwapLogin>, type_v: KucoinSwapWsType) -> KucoinSwapWs {
  141. let ku_ws = KucoinSwapWs::new(false, btree_map,
  142. type_v).await;
  143. ku_ws
  144. }