bingx_swap_test.rs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use futures_util::StreamExt;
  4. use tokio::sync::Mutex;
  5. use tracing::trace;
  6. use exchanges::bingx_swap_ws::{BingxSwapLogin, BingxSwapSubscribeType, BingxSwapWs, BingxSwapWsType};
  7. use exchanges::response_base::ResponseData;
  8. const ACCESS_KEY: &str = "";
  9. const SECRET_KEY: &str = "";
  10. const PASS_KEY: &str = "";
  11. #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
  12. async fn ws_custom_subscribe() {
  13. global::log_utils::init_log_with_trace();
  14. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  15. let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  16. // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
  17. // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
  18. let write_tx_am = Arc::new(Mutex::new(write_tx));
  19. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  20. //读取
  21. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  22. let _tr = tokio::spawn(async move {
  23. trace!("线程-数据读取-开启");
  24. loop {
  25. // 从通道中接收并丢弃所有的消息,直到通道为空
  26. while let Ok(Some(_)) = read_rx.try_next() {
  27. // 从通道中接收并丢弃所有的消息,直到通道为空
  28. while let Ok(Some(_)) = read_rx.try_next() {
  29. // 消息被忽略
  30. }
  31. }
  32. }
  33. // trace!("线程-数据读取-结束");
  34. });
  35. //写数据
  36. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  37. // let write_tx_clone = Arc::clone(&write_tx_am);
  38. // let su = ws.get_subscription();
  39. // let tw = tokio::spawn(async move {
  40. // trace!("线程-数据写入-开始");
  41. // loop {
  42. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  43. // // let close_frame = CloseFrame {
  44. // // code: CloseCode::Normal,
  45. // // reason: Cow::Borrowed("Bye bye"),
  46. // // };
  47. // // let message = Message::Close(Some(close_frame));
  48. //
  49. //
  50. // let message = Message::Text(su.clone());
  51. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  52. // trace!("发送指令成功");
  53. // }
  54. // trace!("线程-数据写入-结束");
  55. // });
  56. let fun = move |data: ResponseData| {
  57. async move {
  58. trace!("---传入的方法~~~~{:?}", data);
  59. }
  60. };
  61. let param = BingxSwapLogin {
  62. access_key: ACCESS_KEY.to_string(),
  63. secret_key: SECRET_KEY.to_string(),
  64. pass_key: PASS_KEY.to_string(),
  65. };
  66. let t1 = tokio::spawn(async move {
  67. let mut ws = get_ws(Option::from(param), BingxSwapWsType::PublicAndPrivate);
  68. ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
  69. ws.set_subscribe(vec![
  70. BingxSwapSubscribeType::PuFuturesTrades,
  71. BingxSwapSubscribeType::PuFuturesDepth,
  72. BingxSwapSubscribeType::PuFuturesRecords,
  73. ]);
  74. //链接
  75. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  76. ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  77. trace!("test 唯一线程结束--");
  78. });
  79. tokio::try_join!(t1).unwrap();
  80. trace!("当此结束");
  81. trace!("重启!");
  82. trace!("参考交易所关闭");
  83. return;
  84. }
  85. fn get_ws(btree_map: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
  86. let Bingx_ws = BingxSwapWs::new(false,btree_map, ws_type);
  87. Bingx_ws
  88. }