kucoin_swap_test.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use futures_util::StreamExt;
  4. use tokio::sync::Mutex;
  5. use tracing::trace;
  6. use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  7. const ACCESS_KEY: &str = "";
  8. const SECRET_KEY: &str = "";
  9. const PASS_KEY: &str = "";
  10. //ws-订阅公共频道信息
  11. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  12. async fn ws_custom_subscribe_pu() {
  13. global::log_utils::init_log_with_trace();
  14. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  15. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  16. let mut ws = get_ws(None, KucoinSwapWsType::Public).await;
  17. ws.set_symbols(vec!["xbt_usdtM".to_string()]);
  18. ws.set_subscribe(vec![
  19. KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
  20. KucoinSwapSubscribeType::PuContractMarketExecution,
  21. KucoinSwapSubscribeType::PuContractMarkettickerV2,
  22. ]);
  23. let write_tx_am = Arc::new(Mutex::new(write_tx));
  24. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  25. //读取
  26. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  27. let _tr = tokio::spawn(async move {
  28. trace!("线程-数据读取-开启");
  29. loop {
  30. if let Some(data) = read_rx.next().await {
  31. trace!("读取数据data:{:?}",data)
  32. }
  33. }
  34. });
  35. //写数据
  36. let _bool_v2_clone = Arc::clone(&is_shutdown_arc);
  37. let _write_tx_clone = Arc::clone(&write_tx_am);
  38. // let tw = tokio::spawn(async move {
  39. // trace!("线程-数据写入-开始");
  40. // loop {
  41. // tokio::time::sleep(Duration::from_millis(20*1000)).await;
  42. // let close_frame = CloseFrame {
  43. // code: CloseCode::Normal,
  44. // reason: Cow::Borrowed("Bye bye"),
  45. // };
  46. // let close_message = Message::Close(Some(close_frame));
  47. // // AbstractWsMode::send_subscribe(write_tx_clone.clone(), Message::Text("32313221".to_string()));
  48. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), close_message);
  49. // trace!("发送指令成功");
  50. // }
  51. // trace!("线程-数据写入-结束");
  52. // });
  53. // loop {
  54. let t1 = tokio::spawn(async move {
  55. //链接
  56. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  57. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  58. trace!("test 唯一线程结束--");
  59. });
  60. tokio::try_join!(t1).unwrap();
  61. trace!("当此结束");
  62. // }
  63. trace!("重启!");
  64. trace!("参考交易所关闭");
  65. return;
  66. }
  67. //ws-订阅私有频道信息
  68. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  69. async fn ws_custom_subscribe_pr() {
  70. global::log_utils::init_log_with_trace();
  71. //对象
  72. let btree_map = KucoinSwapLogin {
  73. access_key: ACCESS_KEY.to_string(),
  74. secret_key: SECRET_KEY.to_string(),
  75. pass_key: PASS_KEY.to_string(),
  76. };
  77. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  78. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  79. // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
  80. // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
  81. let mut ws = get_ws(Option::from(btree_map), KucoinSwapWsType::Private).await;
  82. ws.set_symbols(vec!["xbt_usdtM".to_string()]);
  83. ws.set_subscribe(vec![
  84. // KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
  85. // KucoinSwapSubscribeType::PuContractMarketExecution,
  86. KucoinSwapSubscribeType::PuContractMarkettickerV2,
  87. KucoinSwapSubscribeType::PrContractAccountWallet,
  88. KucoinSwapSubscribeType::PrContractPosition,
  89. KucoinSwapSubscribeType::PrContractMarketTradeOrdersSys,
  90. KucoinSwapSubscribeType::PrContractMarketTradeOrders,
  91. ]);
  92. let write_tx_am = Arc::new(Mutex::new(write_tx));
  93. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  94. //读取
  95. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  96. let _tr = tokio::spawn(async move {
  97. trace!("线程-数据读取-开启");
  98. loop {
  99. if let Some(data) = read_rx.next().await {
  100. trace!("读取数据data:{:?}",data)
  101. }
  102. }
  103. });
  104. //写数据
  105. let _bool_v2_clone = Arc::clone(&is_shutdown_arc);
  106. let _write_tx_clone = Arc::clone(&write_tx_am);
  107. // let tw = tokio::spawn(async move {
  108. // trace!("线程-数据写入-开始");
  109. // loop {
  110. // tokio::time::sleep(Duration::from_millis(20*1000)).await;
  111. // let close_frame = CloseFrame {
  112. // code: CloseCode::Normal,
  113. // reason: Cow::Borrowed("Bye bye"),
  114. // };
  115. // let close_message = Message::Close(Some(close_frame));
  116. // // AbstractWsMode::send_subscribe(write_tx_clone.clone(), Message::Text("32313221".to_string()));
  117. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), close_message);
  118. // trace!("发送指令成功");
  119. // }
  120. // trace!("线程-数据写入-结束");
  121. // });
  122. // loop {
  123. let t1 = tokio::spawn(async move {
  124. //链接
  125. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  126. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  127. trace!("test 唯一线程结束--");
  128. });
  129. tokio::try_join!(t1).unwrap();
  130. trace!("当此结束");
  131. // }
  132. trace!("重启!");
  133. trace!("参考交易所关闭");
  134. return;
  135. }
  136. async fn get_ws(btree_map: Option<KucoinSwapLogin>, type_v: KucoinSwapWsType) -> KucoinSwapWs {
  137. let ku_ws = KucoinSwapWs::new(false, btree_map,
  138. type_v).await;
  139. ku_ws
  140. }