use std::collections::BTreeMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::time::Duration; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_util::StreamExt; use tokio::sync::mpsc::{channel, Sender}; use tokio::try_join; use tokio_tungstenite::client_async; use tokio_tungstenite::tungstenite::{Error, Message}; use tracing::trace; use exchanges::binance_swap_rest::BinanceSwapRest; use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType}; use exchanges::response_base::ResponseData; use exchanges::socket_tool::{client, ws_connect_async}; //ws-订阅公共频道信息 #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn ws_custom_subscribe() { global::log_utils::init_log_with_trace(); // // 币安订阅 let base_url = "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string(); // // okx 公共订阅 // let str = serde_json::json!({ // "op": "subscribe", // "args": [ // { // "channel":"books5", // "instId":"BTC-USDT" // } // ] // }); // // //创建通道,发送指令通道, 与读取推送数据通道 // let (write_tx, write_rx) = futures_channel::mpsc::unbounded(); // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded(); // // // //对象 // // let model = OkxSwapModel::new(); // let is_ok = ws_connect_async(base_url).await; // //写数据,订阅频道,有些是Url跟参数,有些是单独发送订阅字符串 // // write_tx.unbounded_send(Message::Text(str.to_string())).expect("发送失败"); // match is_ok { // Ok(_) => { // let t1 = tokio::spawn(async move { // loop { // if let Some(message) = read_rx.next().await { // // } // } // }); // try_join!(t1).unwrap(); // } // _ => {} // } client(base_url).await; } // // fn get_ws(btree_map: BTreeMap, tx: Sender) -> BinanceSwapWs { // let binance_ws = BinanceSwapWs::new(false, // btree_map, // BinanceSwapWsType::PublicAndPrivate); // binance_ws // }