socket_tool.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. use std::net::{IpAddr, Ipv4Addr, SocketAddr};
  2. use std::sync::Arc;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::time::Duration;
  5. use chrono::Utc;
  6. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  7. use futures_util::{future, pin_mut, SinkExt, StreamExt};
  8. use futures_util::stream::{SplitSink, SplitStream};
  9. use ring::hmac;
  10. use serde_json::json;
  11. use tokio::net::TcpStream;
  12. use tokio::sync::Mutex;
  13. use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
  14. use tokio_tungstenite::tungstenite::{Error, Message};
  15. use tracing::trace;
  16. use crate::proxy;
  17. use crate::proxy::{ProxyEnum, ProxyResponseEnum};
  18. use crate::response_base::ResponseData;
  19. #[derive(Debug)]
  20. pub enum HeartbeatType {
  21. Ping,
  22. Pong,
  23. }
  24. pub struct AbstractWsMode {}
  25. impl AbstractWsMode {
  26. //创建链接
  27. pub async fn ws_connect_async<T, PI, PO>(bool_v1: Arc<AtomicBool>,
  28. address_url: String,
  29. lable: String,
  30. subscribe_array: Vec<String>,
  31. mut write_rx: UnboundedReceiver<Message>,
  32. read_tx: UnboundedSender<ResponseData>,
  33. message_text: T,
  34. message_ping: PI,
  35. message_pong: PO) -> Result<(), Error>
  36. where T: Fn(String) -> Option<ResponseData> + Copy,
  37. PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
  38. PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy
  39. {
  40. //1.是否走代理
  41. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  42. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  43. ProxyResponseEnum::NO => {
  44. // trace!("非 代理");
  45. None
  46. }
  47. ProxyResponseEnum::YES(proxy) => {
  48. // trace!("代理");
  49. Option::from(proxy)
  50. }
  51. };
  52. loop {
  53. let (ws_stream, _) = connect_async(address_url.clone(), proxy).await?;
  54. trace!("WebSocket 握手完成。");
  55. let (mut write, mut read) = ws_stream.split();
  56. //订阅写入(包括订阅信息 )
  57. trace!("订阅内容:{:?}",subscribe_array.clone());
  58. for s in &subscribe_array {
  59. write.send(Message::Text(s.parse().unwrap())).await?;
  60. }
  61. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  62. // let stdin_to_ws = write_rx.map(Ok).forward(write);
  63. // Writing task
  64. let stdin_to_ws = async {
  65. while let Some(message) = write_rx.next().await {
  66. write.send(message).await?;
  67. }
  68. Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  69. };
  70. let ws_to_stdout = async {
  71. while let Some(message) = read.next().await {
  72. let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
  73. // let response_data = func(message);
  74. if response_data.is_some() {
  75. let mut data = response_data.unwrap();
  76. data.label = lable.clone();
  77. let code = data.code.clone();
  78. if code.as_str() == "-1" {} else if code.as_str() == "200" {
  79. if bool_v1.load(Ordering::Relaxed) {
  80. read_tx.unbounded_send(data).unwrap();
  81. }
  82. }
  83. }
  84. }
  85. Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  86. };
  87. // let ws_to_stdout = {
  88. // trace!("---1");
  89. // //读,循环读取,然后拿到 message,,然后开启异步处理 message,
  90. // let result = read.for_each(|message| async {
  91. // let response_data = func(message);
  92. // if response_data.is_some() {
  93. // let mut data = response_data.unwrap();
  94. // data.label = lable.clone();
  95. // let code = data.code.clone();
  96. // if code.as_str() == "-1" {
  97. // // let close_frame = CloseFrame {
  98. // // code: CloseCode::Normal,
  99. // // reason: Cow::Borrowed("Bye bye"),
  100. // // };
  101. // // let close_message = Message::Close(Some(close_frame));
  102. // // write.send(close_message);
  103. // } else if code.as_str() == "200" {
  104. // read_tx.unbounded_send(data).unwrap();
  105. // }
  106. // }
  107. // });
  108. // trace!("---3");
  109. // result
  110. // };
  111. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  112. pin_mut!(stdin_to_ws, ws_to_stdout,);
  113. future::select(stdin_to_ws, ws_to_stdout).await;
  114. trace!("---5");
  115. trace!("---4");
  116. trace!("重启...");
  117. }
  118. // return Ok(());
  119. }
  120. //心跳包
  121. pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
  122. loop {
  123. tokio::time::sleep(Duration::from_millis(millis)).await;
  124. let write_tx_clone = write_tx_clone.lock().await;
  125. write_tx_clone.unbounded_send(
  126. match h_type {
  127. HeartbeatType::Ping => {
  128. Message::Ping(Vec::from("Ping"))
  129. }
  130. HeartbeatType::Pong => {
  131. Message::Pong(Vec::from("Pong"))
  132. }
  133. }
  134. ).expect("发送失败");
  135. trace!("发送指令-心跳:{:?}",h_type);
  136. }
  137. }
  138. //数据解析
  139. pub fn analysis_message<T, PI, PO>(message: Result<Message, Error>,
  140. message_text: T,
  141. message_ping: PI,
  142. message_pong: PO) -> Option<ResponseData>
  143. where T: Fn(String) -> Option<ResponseData>,
  144. PI: Fn(Vec<u8>) -> Option<ResponseData>,
  145. PO: Fn(Vec<u8>) -> Option<ResponseData>
  146. {
  147. match message {
  148. Ok(Message::Text(text)) => message_text(text),
  149. Ok(Message::Ping(pi)) => message_ping(pi),
  150. Ok(Message::Pong(po)) => message_pong(po),
  151. Ok(Message::Binary(s)) => {
  152. //二进制WebSocket消息
  153. let message_str = format!("Binary:{:?}", s);
  154. trace!("{:?}",message_str);
  155. Option::from(ResponseData::new("".to_string(),
  156. "2".to_string(),
  157. message_str, "".to_string()))
  158. }
  159. Ok(Message::Close(c)) => {
  160. let message_str = format!("关闭指令:{:?}", c);
  161. trace!("{:?}",message_str);
  162. Option::from(ResponseData::new("".to_string(),
  163. "0".to_string(),
  164. message_str, "".to_string()))
  165. }
  166. Ok(Message::Frame(f)) => {
  167. //原始帧 正常读取数据不会读取到该 信息类型
  168. let message_str = format!("意外读取到原始帧:{:?}", f);
  169. trace!("{:?}",message_str);
  170. Option::from(ResponseData::new("".to_string(),
  171. "-2".to_string(),
  172. message_str, "".to_string()))
  173. }
  174. Err(e) => {
  175. let message_str = format!("服务器响应:{:?}", e);
  176. trace!("{:?}",message_str);
  177. Option::from(ResponseData::new("".to_string(),
  178. "-1".to_string(),
  179. message_str, "".to_string()))
  180. }
  181. }
  182. }
  183. //发送数据
  184. pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
  185. let write_tx_clone = write_tx_clone.lock().await;
  186. write_tx_clone.unbounded_send(message.clone()).unwrap();
  187. trace!("发送指令:{:?}",message);
  188. true
  189. }
  190. }
  191. //创建链接
  192. pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
  193. SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
  194. //1.是否走代理
  195. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  196. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  197. ProxyResponseEnum::NO => {
  198. trace!("非 代理");
  199. None
  200. }
  201. ProxyResponseEnum::YES(proxy) => {
  202. trace!("代理");
  203. Option::from(proxy)
  204. }
  205. };
  206. let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
  207. trace!("WebSocket 握手完成。");
  208. ws_stream.split()
  209. }
  210. pub async fn client(add_url: String) {
  211. let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
  212. 127,
  213. 0,
  214. 0,
  215. 1)
  216. ), 7890);
  217. //创建通道 开启线程,向通道写入数据
  218. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  219. let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
  220. tokio::spawn(write_sell(write_tx));
  221. //创建socket,,并且读写分离
  222. let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
  223. trace!("WebSocket handshake has been successfully completed");
  224. let (write, read) = ws_stream.split();
  225. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  226. let stdin_to_ws = write_rx.map(Ok).forward(write);
  227. let ws_to_stdout = {
  228. trace!("---1");
  229. //读,循环读取,然后拿到 message,,然后开启异步处理 message,
  230. let result = read.for_each(|message| async {
  231. read_tx.unbounded_send(message.unwrap()).unwrap();
  232. });
  233. trace!("---3");
  234. result
  235. };
  236. tokio::spawn(read_sell(read_rx));
  237. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  238. pin_mut!(stdin_to_ws, ws_to_stdout);
  239. future::select(stdin_to_ws, ws_to_stdout).await;
  240. }
  241. //模拟 业务场景中 发送指令给目标交易所
  242. async fn write_sell(tx: futures_channel::mpsc::UnboundedSender<Message>) {
  243. let _str = serde_json::json!({
  244. "op": "subscribe",
  245. "args": [
  246. {
  247. // "channel":"orders",
  248. // "instType":"SWAP",
  249. // "instFamily":"BTC-USDT"
  250. "channel":"books5",
  251. "instId":"BTC-USDT"
  252. }
  253. ]
  254. });
  255. let str_array: Vec<String> = vec![
  256. // log_in_to_str(),
  257. // str.to_string(),
  258. ];
  259. let i = 0;
  260. loop {
  261. if str_array.len() > i {
  262. let send_str = str_array.get(i).unwrap();
  263. tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
  264. }
  265. tokio::time::sleep(Duration::from_secs(5)).await;
  266. tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
  267. tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
  268. }
  269. }
  270. async fn read_sell(mut rx: futures_channel::mpsc::UnboundedReceiver<Message>) {
  271. loop {
  272. if let Some(message) = rx.next().await {
  273. match message {
  274. Message::Text(s) => {
  275. trace!("Text: {}", s);
  276. }
  277. Message::Binary(s) => {
  278. trace!("Binary: {:?}", s);
  279. }
  280. Message::Ping(s) => {
  281. trace!("Ping: {:?}", s);
  282. }
  283. Message::Pong(s) => {
  284. trace!("Pong: {:?}", s);
  285. }
  286. Message::Close(s) => {
  287. trace!("Close: {:?}", s);
  288. }
  289. Message::Frame(s) => {
  290. trace!("Frame: {:?}", s);
  291. }
  292. }
  293. }
  294. tokio::time::sleep(Duration::from_millis(1)).await
  295. }
  296. }
  297. pub fn log_in_to_str() -> String {
  298. let mut login_json_str = "".to_string();
  299. let access_key: String = "".to_string();
  300. let secret_key: String = "".to_string();
  301. let passphrase: String = "".to_string();
  302. if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
  303. let timestamp = Utc::now().timestamp().to_string();
  304. // 时间戳 + 请求类型+ 请求参数字符串
  305. let message = format!("{}GET{}", timestamp, "/users/self/verify");
  306. let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  307. let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
  308. let sign = base64::encode(result);
  309. let login_json = json!({
  310. "op": "login",
  311. "args": [{
  312. "apiKey": access_key,
  313. "passphrase": passphrase,
  314. "timestamp": timestamp,
  315. "sign": sign }]
  316. });
  317. // trace!("---login_json:{0}", login_json.to_string());
  318. // trace!("--登陆:{}", login_json.to_string());
  319. login_json_str = login_json.to_string();
  320. }
  321. login_json_str
  322. }