mexc_swap_test.rs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. use std::collections::BTreeMap;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use serde_json::json;
  5. use tokio::sync::Mutex;
  6. use tracing::trace;
  7. use exchanges::mexc_swap_rest::MexcSwapRest;
  8. use exchanges::mexc_swap_ws::{MexcSwapLogin, MexcSwapSubscribeType, MexcSwapWs, MexcSwapWsType};
  9. use exchanges::response_base::ResponseData;
  10. const ACCESS_KEY: &str = "";
  11. const SECRET_KEY: &str = "";
  12. const PASS_KEY: &str = "";
  13. #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
  14. async fn ws_custom_subscribe() {
  15. global::log_utils::init_log_with_trace();
  16. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  17. let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  18. // let (write_tx, write_rx) = tokio::sync::broadcast::channel::<Message>(10);
  19. // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::<ResponseData>(10);
  20. let write_tx_am = Arc::new(Mutex::new(write_tx));
  21. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  22. //读取
  23. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  24. let _tr = tokio::spawn(async move {
  25. trace!("线程-数据读取-开启");
  26. loop {
  27. // 从通道中接收并丢弃所有的消息,直到通道为空
  28. while let Ok(Some(_)) = read_rx.try_next() {
  29. // 从通道中接收并丢弃所有的消息,直到通道为空
  30. while let Ok(Some(_)) = read_rx.try_next() {
  31. // 消息被忽略
  32. }
  33. }
  34. }
  35. // trace!("线程-数据读取-结束");
  36. });
  37. //写数据
  38. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  39. // let write_tx_clone = Arc::clone(&write_tx_am);
  40. // let su = ws.get_subscription();
  41. // let tw = tokio::spawn(async move {
  42. // trace!("线程-数据写入-开始");
  43. // loop {
  44. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  45. // // let close_frame = CloseFrame {
  46. // // code: CloseCode::Normal,
  47. // // reason: Cow::Borrowed("Bye bye"),
  48. // // };
  49. // // let message = Message::Close(Some(close_frame));
  50. //
  51. //
  52. // let message = Message::Text(su.clone());
  53. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  54. // trace!("发送指令成功");
  55. // }
  56. // trace!("线程-数据写入-结束");
  57. // });
  58. let fun = move |data: ResponseData| {
  59. async move {
  60. trace!("---传入的方法~~~~{:?}", data);
  61. }
  62. };
  63. let param = MexcSwapLogin {
  64. access_key: ACCESS_KEY.to_string(),
  65. secret_key: SECRET_KEY.to_string(),
  66. pass_key: PASS_KEY.to_string(),
  67. };
  68. let t1 = tokio::spawn(async move {
  69. let mut ws = get_ws(Option::from(param), MexcSwapWsType::PublicAndPrivate);
  70. ws.set_symbols(vec!["BTC_USDT".to_string(), "ETC_USDT".to_string()]);
  71. ws.set_subscribe(vec![
  72. MexcSwapSubscribeType::PuFuturesTrades,
  73. MexcSwapSubscribeType::PuFuturesDepth,
  74. MexcSwapSubscribeType::PuFuturesRecords,
  75. ]);
  76. //链接
  77. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  78. ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  79. trace!("test 唯一线程结束--");
  80. });
  81. tokio::try_join!(t1).unwrap();
  82. trace!("当此结束");
  83. trace!("重启!");
  84. trace!("参考交易所关闭");
  85. return;
  86. }
  87. fn get_ws(btree_map: Option<MexcSwapLogin>, ws_type: MexcSwapWsType) -> MexcSwapWs {
  88. let mexc_exc = MexcSwapWs::new(false,btree_map, ws_type);
  89. mexc_exc
  90. }
  91. //查詢合約基礎信息
  92. #[tokio::test]
  93. async fn rest_get_market_test() {
  94. global::log_utils::init_log_with_trace();
  95. let mut ret = get_rest();
  96. let req_data = ret.get_market(json!({
  97. })).await;
  98. println!("mexc--查詢合約基礎信息--{:?}", req_data);
  99. }
  100. fn get_rest() -> MexcSwapRest {
  101. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  102. btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
  103. btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
  104. btree_map.insert("pass_key".to_string(), PASS_KEY.to_string());
  105. let mexc_exc = MexcSwapRest::new(false, btree_map.clone());
  106. mexc_exc
  107. }