crypto_spot_test.rs 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use futures_util::StreamExt;
  4. use tokio::sync::Mutex;
  5. use tracing::trace;
  6. use exchanges::crypto_spot_ws::{CryptoSpotLogin, CryptoSpotWs, CryptoSpotWsType};
  7. const ACCESS_KEY: &str = "";
  8. const SECRET_KEY: &str = "";
  9. //ws-订阅公共频道信息
  10. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  11. async fn ws_pu() {
  12. global::log_utils::init_log_with_trace();
  13. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  14. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  15. let mut ws = get_ws(None,CryptoSpotWsType::Public);
  16. // ws.set_symbols(vec!["BTC_USDT".to_string()]);
  17. ws.set_subscribe(vec![
  18. // CryptoSpotSubscribeType::PuBookTicker,
  19. // BinanceSwapSubscribeType::PuAggTrade,
  20. // BinanceSwapSubscribeType::PuDepth20levels100ms,
  21. ]);
  22. let write_tx_am = Arc::new(Mutex::new(write_tx));
  23. let bool_v1 = Arc::new(AtomicBool::new(true));
  24. //读取
  25. let _bool_v1_clone = Arc::clone(&bool_v1);
  26. let _tr = tokio::spawn(async move {
  27. trace!("线程-数据读取-开启");
  28. loop {
  29. if let Some(data) = read_rx.next().await {
  30. trace!("读取数据data:{:?}",data)
  31. }
  32. }
  33. // trace!("线程-数据读取-结束");
  34. });
  35. //写数据
  36. // let bool_v2_clone = Arc::clone(&bool_v1);
  37. // let write_tx_clone = Arc::clone(&write_tx_am);
  38. // let su = ws.get_subscription();
  39. // let tw = tokio::spawn(async move {
  40. // trace!("线程-数据写入-开始");
  41. // loop {
  42. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  43. // // let close_frame = CloseFrame {
  44. // // code: CloseCode::Normal,
  45. // // reason: Cow::Borrowed("Bye bye"),
  46. // // };
  47. // // let message = Message::Close(Some(close_frame));
  48. //
  49. //
  50. // let message = Message::Text(su.clone());
  51. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  52. // trace!("发送指令成功");
  53. // }
  54. // trace!("线程-数据写入-结束");
  55. // });
  56. let t1 = tokio::spawn(async move {
  57. //链接
  58. let bool_v3_clone = Arc::clone(&bool_v1);
  59. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  60. trace!("test 唯一线程结束--");
  61. });
  62. tokio::try_join!(t1).unwrap();
  63. trace!("当此结束");
  64. trace!("重启!");
  65. trace!("参考交易所关闭");
  66. return;
  67. }
  68. fn get_ws(btree_map: Option< CryptoSpotLogin>,ws_type:CryptoSpotWsType) -> CryptoSpotWs {
  69. let binance_ws = CryptoSpotWs::new(false,
  70. btree_map,
  71. ws_type);
  72. binance_ws
  73. }