socket_tool.rs 24 KB


  1. use std::net::{IpAddr, Ipv4Addr, SocketAddr};
  2. use std::sync::Arc;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::time::Duration;
  5. use chrono::Utc;
  6. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  7. use futures_util::{future, pin_mut, SinkExt, StreamExt};
  8. use futures_util::stream::{SplitSink, SplitStream};
  9. use ring::hmac;
  10. use serde_json::json;
  11. use tokio::net::TcpStream;
  12. use tokio::sync::Mutex;
  13. use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
  14. use tokio_tungstenite::tungstenite::{Error, Message};
  15. use tracing::{trace};
  16. use crate::proxy;
  17. use crate::proxy::{ProxyEnum, ProxyResponseEnum};
  18. use crate::response_base::ResponseData;
  19. #[derive(Debug)]
  20. pub enum HeartbeatType {
  21. Ping,
  22. Pong,
  23. Custom(String),
  24. }
  25. pub struct AbstractWsMode {}
  26. impl AbstractWsMode {
  27. //创建链接
  28. pub async fn ws_connect_async<T, PI, PO>(bool_v1: Arc<AtomicBool>,
  29. address_url: String,
  30. lable: String,
  31. subscribe_array: Vec<String>,
  32. mut write_rx: UnboundedReceiver<Message>,
  33. read_tx: UnboundedSender<ResponseData>,
  34. message_text: T,
  35. message_ping: PI,
  36. message_pong: PO) -> Result<(), Error>
  37. where T: Fn(String) -> Option<ResponseData> + Copy,
  38. PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
  39. PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy
  40. {
  41. //1.是否走代理
  42. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  43. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  44. ProxyResponseEnum::NO => {
  45. // trace!("非 代理");
  46. None
  47. }
  48. ProxyResponseEnum::YES(proxy) => {
  49. // trace!("代理");
  50. Option::from(proxy)
  51. }
  52. };
  53. loop {
  54. match connect_async(address_url.clone(), proxy).await {
  55. Ok((ws_stream, _)) => {
  56. trace!("WebSocket 握手完成。");
  57. let (write, mut read) = ws_stream.split();
  58. let write_arc = Arc::new(Mutex::new(write));
  59. let write_clone = Arc::clone(&write_arc);
  60. //订阅写入(包括订阅信息 )
  61. trace!("订阅内容:{:?}",subscribe_array.clone());
  62. for s in &subscribe_array {
  63. let mut write_lock = write_clone.lock().await;
  64. write_lock.send(Message::Text(s.parse().unwrap())).await?;
  65. }
  66. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  67. // let stdin_to_ws = write_rx.map(Ok).forward(write);
  68. // Writing task
  69. let write_clone2 = Arc::clone(&write_arc);
  70. let stdin_to_ws = async {
  71. while let Some(message) = write_rx.next().await {
  72. let mut write_lock2 = write_clone2.lock().await;
  73. write_lock2.send(message).await?;
  74. }
  75. Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  76. };
  77. let write_clone3 = Arc::clone(&write_arc);
  78. let ws_to_stdout = async {
  79. while let Some(message) = read.next().await {
  80. if !bool_v1.load(Ordering::Relaxed) {
  81. continue;
  82. }
  83. let mut write_lock3 = write_clone3.lock().await;
  84. let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
  85. // let response_data = func(message);
  86. if response_data.is_some() {
  87. let data = response_data.unwrap();
  88. if data.code == "200" {
  89. let mut data_c = data.clone();
  90. data_c.time = chrono::Utc::now().timestamp_micros();
  91. data_c.label = lable.clone();
  92. // if data_c.label.contains("gate_usdt_swap") {
  93. // if data_c.channel == "futures.order_book" {
  94. // if read_tx.len() == 0 {
  95. // read_tx.unbounded_send(data_c).unwrap();
  96. // }
  97. // } else {
  98. // read_tx.unbounded_send(data_c).unwrap();
  99. // }
  100. // } else if data_c.label.contains("binance_usdt_swap") {
  101. // if data_c.channel == "bookTicker" {
  102. // if read_tx.len() == 0 {
  103. // read_tx.unbounded_send(data_c).unwrap();
  104. // }
  105. // } else {
  106. // read_tx.unbounded_send(data_c).unwrap();
  107. // }
  108. // } else if data_c.label.contains("bybit_usdt_swap") {
  109. // if data_c.channel == "orderbook" {
  110. // if read_tx.len() == 0 {
  111. // read_tx.unbounded_send(data_c).unwrap();
  112. // }
  113. // } else {
  114. // read_tx.unbounded_send(data_c).unwrap();
  115. // }
  116. // } else {
  117. if read_tx.len() == 0 {
  118. read_tx.unbounded_send(data_c).unwrap();
  119. }
  120. // }
  121. }
  122. let code = data.code.clone();
  123. /*
  124. 200 -正确返回
  125. -200 -登录成功
  126. -201 -订阅成功
  127. -300 -客户端收到服务器心跳ping,需要响应
  128. -301 -客户端收到服务器心跳pong,需要响应
  129. -302 -客户端收到服务器心跳自定义,需要响应自定义
  130. */
  131. match code.as_str() {
  132. "200" => {
  133. }
  134. "-200" => {
  135. //登录成功
  136. trace!("登录成功:{:?}", data);
  137. }
  138. "-201" => {
  139. //订阅成功
  140. trace!("订阅成功:{:?}", data);
  141. }
  142. "-300" => {
  143. //服务器发送心跳 ping 给客户端,客户端需要pong回应
  144. trace!("服务器响应-ping");
  145. if data.data.len() > 0 {
  146. write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
  147. trace!("客户端回应服务器-pong");
  148. }
  149. }
  150. "-301" => {
  151. //服务器发送心跳 pong 给客户端,客户端需要ping回应
  152. trace!("服务器响应-pong");
  153. if data.data.len() > 0 {
  154. write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
  155. trace!("客户端回应服务器-ping");
  156. }
  157. }
  158. "-302" => {
  159. //客户端收到服务器心跳自定义,需要响应自定义
  160. trace!("特定字符心跳,特殊响应:{:?}", data);
  161. write_lock3.send(Message::Text(data.data)).await?;
  162. trace!("特殊字符心跳-回应完成");
  163. }
  164. _ => {
  165. trace!("未知:{:?}",data);
  166. }
  167. }
  168. }
  169. }
  170. Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  171. };
  172. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  173. pin_mut!(stdin_to_ws, ws_to_stdout,);
  174. future::select(stdin_to_ws, ws_to_stdout).await;
  175. }
  176. Err(e) => {
  177. trace!("WebSocket 握手失败:{:?}",e);
  178. }
  179. }
  180. trace!("---5");
  181. trace!("---4");
  182. trace!("重启...");
  183. // let (ws_stream, _) = connect_async(address_url.clone(), proxy).await.unwrap();
  184. // trace!("WebSocket 握手完成。");
  185. // let (write, mut read) = ws_stream.split();
  186. //
  187. // let write_arc = Arc::new(Mutex::new(write));
  188. // let write_clone = Arc::clone(&write_arc);
  189. // //订阅写入(包括订阅信息 )
  190. // trace!("订阅内容:{:?}",subscribe_array.clone());
  191. // for s in &subscribe_array {
  192. // let mut write_lock = write_clone.lock().await;
  193. // write_lock.send(Message::Text(s.parse().unwrap())).await?;
  194. // }
  195. //
  196. // //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  197. // // let stdin_to_ws = write_rx.map(Ok).forward(write);
  198. // // Writing task
  199. // let write_clone2 = Arc::clone(&write_arc);
  200. // let stdin_to_ws = async {
  201. // while let Some(message) = write_rx.next().await {
  202. // let mut write_lock2 = write_clone2.lock().await;
  203. // write_lock2.send(message).await?;
  204. // }
  205. // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  206. // };
  207. // let write_clone3 = Arc::clone(&write_arc);
  208. // let ws_to_stdout = async {
  209. // while let Some(message) = read.next().await {
  210. // let mut write_lock3 = write_clone3.lock().await;
  211. // let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
  212. // // let response_data = func(message);
  213. // if response_data.is_some() {
  214. // let mut data = response_data.unwrap();
  215. // data.label = lable.clone();
  216. // let code = data.code.clone();
  217. // /*
  218. // 200 -正确返回
  219. // -200 -登录成功
  220. // -201 -订阅成功
  221. // -300 -客户端收到服务器心跳ping,需要响应
  222. // -301 -客户端收到服务器心跳pong,需要响应
  223. // -302 -客户端收到服务器心跳自定义,需要响应自定义
  224. // */
  225. // match code.as_str() {
  226. // "200" => {
  227. // if bool_v1.load(Ordering::Relaxed) {
  228. // read_tx.unbounded_send(data).unwrap();
  229. // }
  230. // }
  231. // "-200" => {
  232. // //登录成功
  233. // trace!("登录成功:{:?}", data);
  234. // }
  235. // "-201" => {
  236. // //订阅成功
  237. // trace!("订阅成功:{:?}", data);
  238. // }
  239. // "-300" => {
  240. // //服务器发送心跳 ping 给客户端,客户端需要pong回应
  241. // trace!("服务器响应-ping");
  242. // if data.data.len() > 0 {
  243. // write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
  244. // trace!("客户端回应服务器-pong");
  245. // }
  246. // }
  247. // "-301" => {
  248. // //服务器发送心跳 pong 给客户端,客户端需要ping回应
  249. // trace!("服务器响应-pong");
  250. // if data.data.len() > 0 {
  251. // write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
  252. // trace!("客户端回应服务器-ping");
  253. // }
  254. // }
  255. // "-302" => {
  256. // //客户端收到服务器心跳自定义,需要响应自定义
  257. // trace!("特定字符心跳,特殊响应:{:?}", data);
  258. // write_lock3.send(Message::Text(data.data)).await?;
  259. // trace!("特殊字符心跳-回应完成");
  260. // }
  261. // _ => {
  262. // trace!("未知:{:?}",data);
  263. // }
  264. // }
  265. // }
  266. // }
  267. // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  268. // };
  269. //
  270. // //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  271. // pin_mut!(stdin_to_ws, ws_to_stdout,);
  272. // future::select(stdin_to_ws, ws_to_stdout).await;
  273. // trace!("---5");
  274. // trace!("---4");
  275. // trace!("重启...");
  276. }
  277. // return Ok(());
  278. }
  279. //心跳包
  280. pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
  281. loop {
  282. tokio::time::sleep(Duration::from_millis(millis)).await;
  283. let write_tx_clone = write_tx_clone.lock().await;
  284. write_tx_clone.unbounded_send(
  285. match h_type {
  286. HeartbeatType::Ping => {
  287. Message::Ping(Vec::from("Ping"))
  288. }
  289. HeartbeatType::Pong => {
  290. Message::Pong(Vec::from("Pong"))
  291. }
  292. HeartbeatType::Custom(ref str) => {
  293. Message::Text(str.parse().unwrap())
  294. }
  295. }
  296. ).expect("发送失败");
  297. trace!("发送指令-心跳:{:?}",h_type);
  298. }
  299. }
  300. //数据解析
  301. pub fn analysis_message<T, PI, PO>(message: Result<Message, Error>,
  302. message_text: T,
  303. message_ping: PI,
  304. message_pong: PO) -> Option<ResponseData>
  305. where T: Fn(String) -> Option<ResponseData>,
  306. PI: Fn(Vec<u8>) -> Option<ResponseData>,
  307. PO: Fn(Vec<u8>) -> Option<ResponseData>
  308. {
  309. match message {
  310. Ok(Message::Text(text)) => message_text(text),
  311. Ok(Message::Ping(pi)) => message_ping(pi),
  312. Ok(Message::Pong(po)) => message_pong(po),
  313. Ok(Message::Binary(s)) => {
  314. //二进制WebSocket消息
  315. let message_str = format!("Binary:{:?}", s);
  316. trace!("{:?}",message_str);
  317. Option::from(ResponseData::new("".to_string(),
  318. "2".to_string(),
  319. message_str, "".to_string()))
  320. }
  321. Ok(Message::Close(c)) => {
  322. let message_str = format!("关闭指令:{:?}", c);
  323. trace!("{:?}",message_str);
  324. Option::from(ResponseData::new("".to_string(),
  325. "0".to_string(),
  326. message_str, "".to_string()))
  327. }
  328. Ok(Message::Frame(f)) => {
  329. //原始帧 正常读取数据不会读取到该 信息类型
  330. let message_str = format!("意外读取到原始帧:{:?}", f);
  331. trace!("{:?}",message_str);
  332. Option::from(ResponseData::new("".to_string(),
  333. "-2".to_string(),
  334. message_str, "".to_string()))
  335. }
  336. Err(e) => {
  337. let message_str = format!("服务器响应:{:?}", e);
  338. trace!("{:?}",message_str);
  339. Option::from(ResponseData::new("".to_string(),
  340. "-1".to_string(),
  341. message_str, "".to_string()))
  342. }
  343. }
  344. }
  345. //发送数据
  346. pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
  347. let write_tx_clone = write_tx_clone.lock().await;
  348. write_tx_clone.unbounded_send(message.clone()).unwrap();
  349. trace!("发送指令:{:?}",message);
  350. true
  351. }
  352. }
  353. //创建链接
  354. pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
  355. SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
  356. //1.是否走代理
  357. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  358. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  359. ProxyResponseEnum::NO => {
  360. trace!("非 代理");
  361. None
  362. }
  363. ProxyResponseEnum::YES(proxy) => {
  364. trace!("代理");
  365. Option::from(proxy)
  366. }
  367. };
  368. let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
  369. trace!("WebSocket 握手完成。");
  370. ws_stream.split()
  371. }
  372. pub async fn client(add_url: String) {
  373. let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
  374. 127,
  375. 0,
  376. 0,
  377. 1)
  378. ), 7890);
  379. //创建通道 开启线程,向通道写入数据
  380. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  381. let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
  382. tokio::spawn(write_sell(write_tx));
  383. //创建socket,,并且读写分离
  384. let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
  385. trace!("WebSocket handshake has been successfully completed");
  386. let (write, read) = ws_stream.split();
  387. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  388. let stdin_to_ws = write_rx.map(Ok).forward(write);
  389. let ws_to_stdout = {
  390. trace!("---1");
  391. //读,循环读取,然后拿到 message,,然后开启异步处理 message,
  392. let result = read.for_each(|message| async {
  393. read_tx.unbounded_send(message.unwrap()).unwrap();
  394. });
  395. trace!("---3");
  396. result
  397. };
  398. tokio::spawn(read_sell(read_rx));
  399. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  400. pin_mut!(stdin_to_ws, ws_to_stdout);
  401. future::select(stdin_to_ws, ws_to_stdout).await;
  402. }
  403. //模拟 业务场景中 发送指令给目标交易所
  404. async fn write_sell(tx: futures_channel::mpsc::UnboundedSender<Message>) {
  405. let _str = serde_json::json!({
  406. "op": "subscribe",
  407. "args": [
  408. {
  409. // "channel":"orders",
  410. // "instType":"SWAP",
  411. // "instFamily":"BTC-USDT"
  412. "channel":"books5",
  413. "instId":"BTC-USDT"
  414. }
  415. ]
  416. });
  417. let str_array: Vec<String> = vec![
  418. // log_in_to_str(),
  419. // str.to_string(),
  420. ];
  421. let i = 0;
  422. loop {
  423. if str_array.len() > i {
  424. let send_str = str_array.get(i).unwrap();
  425. tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
  426. }
  427. tokio::time::sleep(Duration::from_secs(5)).await;
  428. tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
  429. tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
  430. }
  431. }
  432. async fn read_sell(mut rx: futures_channel::mpsc::UnboundedReceiver<Message>) {
  433. loop {
  434. if let Some(message) = rx.next().await {
  435. match message {
  436. Message::Text(s) => {
  437. trace!("Text: {}", s);
  438. }
  439. Message::Binary(s) => {
  440. trace!("Binary: {:?}", s);
  441. }
  442. Message::Ping(s) => {
  443. trace!("Ping: {:?}", s);
  444. }
  445. Message::Pong(s) => {
  446. trace!("Pong: {:?}", s);
  447. }
  448. Message::Close(s) => {
  449. trace!("Close: {:?}", s);
  450. }
  451. Message::Frame(s) => {
  452. trace!("Frame: {:?}", s);
  453. }
  454. }
  455. }
  456. tokio::time::sleep(Duration::from_millis(1)).await
  457. }
  458. }
  459. pub fn log_in_to_str() -> String {
  460. let mut login_json_str = "".to_string();
  461. let access_key: String = "".to_string();
  462. let secret_key: String = "".to_string();
  463. let passphrase: String = "".to_string();
  464. if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
  465. let timestamp = Utc::now().timestamp().to_string();
  466. // 时间戳 + 请求类型+ 请求参数字符串
  467. let message = format!("{}GET{}", timestamp, "/users/self/verify");
  468. let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  469. let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
  470. let sign = base64::encode(result);
  471. let login_json = json!({
  472. "op": "login",
  473. "args": [{
  474. "apiKey": access_key,
  475. "passphrase": passphrase,
  476. "timestamp": timestamp,
  477. "sign": sign }]
  478. });
  479. // trace!("---login_json:{0}", login_json.to_string());
  480. // trace!("--登陆:{}", login_json.to_string());
  481. login_json_str = login_json.to_string();
  482. }
  483. login_json_str
  484. }