socket_tool_test.rs 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. use std::collections::BTreeMap;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use std::time::Duration;
  5. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  6. use futures_util::StreamExt;
  7. use tokio::sync::mpsc::{channel, Sender};
  8. use tokio::try_join;
  9. use tokio_tungstenite::client_async;
  10. use tokio_tungstenite::tungstenite::{Error, Message};
  11. use tracing::trace;
  12. use exchanges::binance_swap_rest::BinanceSwapRest;
  13. use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  14. use exchanges::response_base::ResponseData;
  15. use exchanges::socket_tool::{client, ws_connect_async};
  16. //ws-订阅公共频道信息
  17. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  18. async fn ws_custom_subscribe() {
  19. global::log_utils::init_log_with_trace();
  20. // // 币安订阅
  21. let base_url = "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string();
  22. // // okx 公共订阅
  23. // let str = serde_json::json!({
  24. // "op": "subscribe",
  25. // "args": [
  26. // {
  27. // "channel":"books5",
  28. // "instId":"BTC-USDT"
  29. // }
  30. // ]
  31. // });
  32. //
  33. // //创建通道,发送指令通道, 与读取推送数据通道
  34. // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  35. // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  36. //
  37. //
  38. // //对象
  39. // // let model = OkxSwapModel::new();
  40. // let is_ok = ws_connect_async(base_url).await;
  41. // //写数据,订阅频道,有些是Url跟参数,有些是单独发送订阅字符串
  42. // // write_tx.unbounded_send(Message::Text(str.to_string())).expect("发送失败");
  43. // match is_ok {
  44. // Ok(_) => {
  45. // let t1 = tokio::spawn(async move {
  46. // loop {
  47. // if let Some(message) = read_rx.next().await {
  48. //
  49. // }
  50. // }
  51. // });
  52. // try_join!(t1).unwrap();
  53. // }
  54. // _ => {}
  55. // }
  56. client(base_url).await;
  57. }
  58. //
  59. // fn get_ws(btree_map: BTreeMap<String, String>, tx: Sender<ResponseData>) -> BinanceSwapWs {
  60. // let binance_ws = BinanceSwapWs::new(false,
  61. // btree_map,
  62. // BinanceSwapWsType::PublicAndPrivate);
  63. // binance_ws
  64. // }