crypto_spot_test.rs 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use futures_util::StreamExt;
  4. use tokio::sync::Mutex;
  5. use tracing::trace;
  6. use exchanges::crypto_spot_ws::{CryptoSpotLogin, CryptoSpotSubscribeType, CryptoSpotWs, CryptoSpotWsType};
  7. const ACCESS_KEY: &str = "";
  8. const SECRET_KEY: &str = "";
  9. //ws-订阅公共频道信息
  10. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  11. async fn ws_pu() {
  12. global::log_utils::init_log_with_trace();
  13. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  14. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  15. let mut ws = get_ws(None, CryptoSpotWsType::Public);
  16. ws.set_symbols(vec!["BTC_USD".to_string()]);
  17. ws.set_subscribe(vec![
  18. // CryptoSpotSubscribeType::PuBook,
  19. // CryptoSpotSubscribeType::PuTicker,
  20. // CryptoSpotSubscribeType::PuTrade,
  21. CryptoSpotSubscribeType::PuCandlestick,
  22. ]);
  23. let write_tx_am = Arc::new(Mutex::new(write_tx));
  24. let bool_v1 = Arc::new(AtomicBool::new(true));
  25. //读取
  26. let _bool_v1_clone = Arc::clone(&bool_v1);
  27. let _tr = tokio::spawn(async move {
  28. trace!("线程-数据读取-开启");
  29. loop {
  30. if let Some(data) = read_rx.next().await {
  31. trace!("读取数据data:{:?}",data)
  32. }
  33. }
  34. // trace!("线程-数据读取-结束");
  35. });
  36. //写数据
  37. // let bool_v2_clone = Arc::clone(&bool_v1);
  38. // let write_tx_clone = Arc::clone(&write_tx_am);
  39. // let su = ws.get_subscription();
  40. // let tw = tokio::spawn(async move {
  41. // trace!("线程-数据写入-开始");
  42. // loop {
  43. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  44. // // let close_frame = CloseFrame {
  45. // // code: CloseCode::Normal,
  46. // // reason: Cow::Borrowed("Bye bye"),
  47. // // };
  48. // // let message = Message::Close(Some(close_frame));
  49. //
  50. //
  51. // let message = Message::Text(su.clone());
  52. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  53. // trace!("发送指令成功");
  54. // }
  55. // trace!("线程-数据写入-结束");
  56. // });
  57. let t1 = tokio::spawn(async move {
  58. //链接
  59. let bool_v3_clone = Arc::clone(&bool_v1);
  60. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  61. trace!("test 唯一线程结束--");
  62. });
  63. tokio::try_join!(t1).unwrap();
  64. trace!("当此结束");
  65. trace!("重启!");
  66. trace!("参考交易所关闭");
  67. return;
  68. }
  69. fn get_ws(btree_map: Option<CryptoSpotLogin>, ws_type: CryptoSpotWsType) -> CryptoSpotWs {
  70. let binance_ws = CryptoSpotWs::new(false,
  71. btree_map,
  72. ws_type);
  73. binance_ws
  74. }