socket_tool.rs 25 KB


  1. use std::net::{IpAddr, Ipv4Addr, SocketAddr};
  2. use std::sync::Arc;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::time::Duration;
  5. use chrono::Utc;
  6. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  7. use futures_util::{future, pin_mut, SinkExt, StreamExt};
  8. use futures_util::stream::{SplitSink, SplitStream};
  9. use ring::hmac;
  10. use serde_json::json;
  11. use tokio::net::TcpStream;
  12. use tokio::{spawn};
  13. use tokio::sync::Mutex;
  14. use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
  15. use tokio_tungstenite::tungstenite::{Error, Message};
  16. use tracing::{trace};
  17. use crate::proxy;
  18. use crate::proxy::{ProxyEnum, ProxyResponseEnum};
  19. use crate::response_base::ResponseData;
  20. #[derive(Debug)]
  21. pub enum HeartbeatType {
  22. Ping,
  23. Pong,
  24. Custom(String),
  25. }
  26. pub struct AbstractWsMode {}
  27. impl AbstractWsMode {
  28. //创建链接
  29. pub async fn ws_connect_async<T, PI, PO>(bool_v1: Arc<AtomicBool>,
  30. address_url: String,
  31. lable: String,
  32. subscribe_array: Vec<String>,
  33. mut write_rx: UnboundedReceiver<Message>,
  34. read_tx: UnboundedSender<ResponseData>,
  35. message_text: T,
  36. message_ping: PI,
  37. message_pong: PO) -> Result<(), Error>
  38. where T: Fn(String) -> Option<ResponseData> + Copy,
  39. PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
  40. PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy
  41. {
  42. //1.是否走代理
  43. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  44. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  45. ProxyResponseEnum::NO => {
  46. // trace!("非 代理");
  47. None
  48. }
  49. ProxyResponseEnum::YES(proxy) => {
  50. // trace!("代理");
  51. Option::from(proxy)
  52. }
  53. };
  54. let read_arc = Arc::new(Mutex::new(read_tx));
  55. let read1 = read_arc.clone();
  56. loop {
  57. match connect_async(address_url.clone(), proxy).await {
  58. Ok((ws_stream, _)) => {
  59. trace!("WebSocket 握手完成。");
  60. let (write, mut read) = ws_stream.split();
  61. let write_arc = Arc::new(Mutex::new(write));
  62. let write_clone = Arc::clone(&write_arc);
  63. //订阅写入(包括订阅信息 )
  64. trace!("订阅内容:{:?}",subscribe_array.clone());
  65. for s in &subscribe_array {
  66. let mut write_lock = write_clone.lock().await;
  67. write_lock.send(Message::Text(s.parse().unwrap())).await?;
  68. }
  69. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  70. // let stdin_to_ws = write_rx.map(Ok).forward(write);
  71. // Writing task
  72. let write_clone2 = Arc::clone(&write_arc);
  73. let stdin_to_ws = async {
  74. while let Some(message) = write_rx.next().await {
  75. let mut write_lock2 = write_clone2.lock().await;
  76. write_lock2.send(message).await?;
  77. }
  78. Ok::<(), Error>(())
  79. };
  80. let write_clone3 = Arc::clone(&write_arc);
  81. let ws_to_stdout = async {
  82. while let Some(message) = read.next().await {
  83. if !bool_v1.load(Ordering::Relaxed) {
  84. continue;
  85. }
  86. let mut write_lock3 = write_clone3.lock().await;
  87. let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
  88. // let response_data = func(message);
  89. if response_data.is_some() {
  90. let data = response_data.unwrap();
  91. if data.code == "200" {
  92. let mut data_c = data.clone();
  93. data_c.time = chrono::Utc::now().timestamp_micros();
  94. data_c.label = lable.clone();
  95. let r = read1.clone();
  96. let read = r.lock().await;
  97. if data_c.label.contains("gate_usdt_swap") {
  98. if data_c.channel == "futures.order_book" {
  99. if read.len() == 0 {
  100. read.unbounded_send(data_c).unwrap();
  101. }
  102. } else {
  103. read.unbounded_send(data_c).unwrap();
  104. }
  105. } else if data_c.label.contains("binance_usdt_swap") {
  106. if data_c.channel == "bookTicker" {
  107. if read.len() == 0 {
  108. read.unbounded_send(data_c).unwrap();
  109. }
  110. } else {
  111. read.unbounded_send(data_c).unwrap();
  112. }
  113. } else if data_c.label.contains("bybit_usdt_swap") {
  114. if data_c.channel == "orderbook" {
  115. if read.len() == 0 {
  116. read.unbounded_send(data_c).unwrap();
  117. }
  118. } else {
  119. read.unbounded_send(data_c).unwrap();
  120. }
  121. } else {
  122. if read.len() == 0 {
  123. read.unbounded_send(data_c).unwrap();
  124. }
  125. }
  126. }
  127. let code = data.code.clone();
  128. /*
  129. 200 -正确返回
  130. -200 -登录成功
  131. -201 -订阅成功
  132. -300 -客户端收到服务器心跳ping,需要响应
  133. -301 -客户端收到服务器心跳pong,需要响应
  134. -302 -客户端收到服务器心跳自定义,需要响应自定义
  135. */
  136. match code.as_str() {
  137. "200" => {
  138. }
  139. "-200" => {
  140. //登录成功
  141. trace!("登录成功:{:?}", data);
  142. }
  143. "-201" => {
  144. //订阅成功
  145. trace!("订阅成功:{:?}", data);
  146. }
  147. "-300" => {
  148. //服务器发送心跳 ping 给客户端,客户端需要pong回应
  149. trace!("服务器响应-ping");
  150. if data.data.len() > 0 {
  151. write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
  152. trace!("客户端回应服务器-pong");
  153. }
  154. }
  155. "-301" => {
  156. //服务器发送心跳 pong 给客户端,客户端需要ping回应
  157. trace!("服务器响应-pong");
  158. if data.data.len() > 0 {
  159. write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
  160. trace!("客户端回应服务器-ping");
  161. }
  162. }
  163. "-302" => {
  164. //客户端收到服务器心跳自定义,需要响应自定义
  165. trace!("特定字符心跳,特殊响应:{:?}", data);
  166. write_lock3.send(Message::Text(data.data)).await?;
  167. trace!("特殊字符心跳-回应完成");
  168. }
  169. _ => {
  170. trace!("未知:{:?}",data);
  171. }
  172. }
  173. }
  174. }
  175. Ok::<(), Error>(())
  176. };
  177. // // 防止 cpu 休眠。
  178. // let read2 = read_arc.clone();
  179. // spawn(async move {
  180. // let t_str = "t".to_string();
  181. // let response = ResponseData::new(t_str.clone(),
  182. // t_str.clone(),
  183. // t_str.clone(),
  184. // t_str.clone());
  185. // loop {
  186. // time::sleep(Duration::from_nanos(1)).await;
  187. //
  188. // let t = response.clone();
  189. // let r = read2.clone();
  190. // spawn(async move {
  191. // let read = r.lock().await;
  192. //
  193. // read.unbounded_send(t.clone()).unwrap();
  194. // });
  195. // }
  196. // });
  197. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  198. pin_mut!(stdin_to_ws, ws_to_stdout,);
  199. future::select(stdin_to_ws, ws_to_stdout).await;
  200. }
  201. Err(e) => {
  202. trace!("WebSocket 握手失败:{:?}",e);
  203. }
  204. }
  205. trace!("---5");
  206. trace!("---4");
  207. trace!("重启...");
  208. // let (ws_stream, _) = connect_async(address_url.clone(), proxy).await.unwrap();
  209. // trace!("WebSocket 握手完成。");
  210. // let (write, mut read) = ws_stream.split();
  211. //
  212. // let write_arc = Arc::new(Mutex::new(write));
  213. // let write_clone = Arc::clone(&write_arc);
  214. // //订阅写入(包括订阅信息 )
  215. // trace!("订阅内容:{:?}",subscribe_array.clone());
  216. // for s in &subscribe_array {
  217. // let mut write_lock = write_clone.lock().await;
  218. // write_lock.send(Message::Text(s.parse().unwrap())).await?;
  219. // }
  220. //
  221. // //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  222. // // let stdin_to_ws = write_rx.map(Ok).forward(write);
  223. // // Writing task
  224. // let write_clone2 = Arc::clone(&write_arc);
  225. // let stdin_to_ws = async {
  226. // while let Some(message) = write_rx.next().await {
  227. // let mut write_lock2 = write_clone2.lock().await;
  228. // write_lock2.send(message).await?;
  229. // }
  230. // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  231. // };
  232. // let write_clone3 = Arc::clone(&write_arc);
  233. // let ws_to_stdout = async {
  234. // while let Some(message) = read.next().await {
  235. // let mut write_lock3 = write_clone3.lock().await;
  236. // let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
  237. // // let response_data = func(message);
  238. // if response_data.is_some() {
  239. // let mut data = response_data.unwrap();
  240. // data.label = lable.clone();
  241. // let code = data.code.clone();
  242. // /*
  243. // 200 -正确返回
  244. // -200 -登录成功
  245. // -201 -订阅成功
  246. // -300 -客户端收到服务器心跳ping,需要响应
  247. // -301 -客户端收到服务器心跳pong,需要响应
  248. // -302 -客户端收到服务器心跳自定义,需要响应自定义
  249. // */
  250. // match code.as_str() {
  251. // "200" => {
  252. // if bool_v1.load(Ordering::Relaxed) {
  253. // read_tx.unbounded_send(data).unwrap();
  254. // }
  255. // }
  256. // "-200" => {
  257. // //登录成功
  258. // trace!("登录成功:{:?}", data);
  259. // }
  260. // "-201" => {
  261. // //订阅成功
  262. // trace!("订阅成功:{:?}", data);
  263. // }
  264. // "-300" => {
  265. // //服务器发送心跳 ping 给客户端,客户端需要pong回应
  266. // trace!("服务器响应-ping");
  267. // if data.data.len() > 0 {
  268. // write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
  269. // trace!("客户端回应服务器-pong");
  270. // }
  271. // }
  272. // "-301" => {
  273. // //服务器发送心跳 pong 给客户端,客户端需要ping回应
  274. // trace!("服务器响应-pong");
  275. // if data.data.len() > 0 {
  276. // write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
  277. // trace!("客户端回应服务器-ping");
  278. // }
  279. // }
  280. // "-302" => {
  281. // //客户端收到服务器心跳自定义,需要响应自定义
  282. // trace!("特定字符心跳,特殊响应:{:?}", data);
  283. // write_lock3.send(Message::Text(data.data)).await?;
  284. // trace!("特殊字符心跳-回应完成");
  285. // }
  286. // _ => {
  287. // trace!("未知:{:?}",data);
  288. // }
  289. // }
  290. // }
  291. // }
  292. // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  293. // };
  294. //
  295. // //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  296. // pin_mut!(stdin_to_ws, ws_to_stdout,);
  297. // future::select(stdin_to_ws, ws_to_stdout).await;
  298. // trace!("---5");
  299. // trace!("---4");
  300. // trace!("重启...");
  301. }
  302. // return Ok(());
  303. }
  304. //心跳包
  305. pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
  306. loop {
  307. tokio::time::sleep(Duration::from_millis(millis)).await;
  308. let write_tx_clone = write_tx_clone.lock().await;
  309. write_tx_clone.unbounded_send(
  310. match h_type {
  311. HeartbeatType::Ping => {
  312. Message::Ping(Vec::from("Ping"))
  313. }
  314. HeartbeatType::Pong => {
  315. Message::Pong(Vec::from("Pong"))
  316. }
  317. HeartbeatType::Custom(ref str) => {
  318. Message::Text(str.parse().unwrap())
  319. }
  320. }
  321. ).expect("发送失败");
  322. trace!("发送指令-心跳:{:?}",h_type);
  323. }
  324. }
  325. //数据解析
  326. pub fn analysis_message<T, PI, PO>(message: Result<Message, Error>,
  327. message_text: T,
  328. message_ping: PI,
  329. message_pong: PO) -> Option<ResponseData>
  330. where T: Fn(String) -> Option<ResponseData>,
  331. PI: Fn(Vec<u8>) -> Option<ResponseData>,
  332. PO: Fn(Vec<u8>) -> Option<ResponseData>
  333. {
  334. match message {
  335. Ok(Message::Text(text)) => message_text(text),
  336. Ok(Message::Ping(pi)) => message_ping(pi),
  337. Ok(Message::Pong(po)) => message_pong(po),
  338. Ok(Message::Binary(s)) => {
  339. //二进制WebSocket消息
  340. let message_str = format!("Binary:{:?}", s);
  341. trace!("{:?}",message_str);
  342. Option::from(ResponseData::new("".to_string(),
  343. "2".to_string(),
  344. message_str, "".to_string()))
  345. }
  346. Ok(Message::Close(c)) => {
  347. let message_str = format!("关闭指令:{:?}", c);
  348. trace!("{:?}",message_str);
  349. Option::from(ResponseData::new("".to_string(),
  350. "0".to_string(),
  351. message_str, "".to_string()))
  352. }
  353. Ok(Message::Frame(f)) => {
  354. //原始帧 正常读取数据不会读取到该 信息类型
  355. let message_str = format!("意外读取到原始帧:{:?}", f);
  356. trace!("{:?}",message_str);
  357. Option::from(ResponseData::new("".to_string(),
  358. "-2".to_string(),
  359. message_str, "".to_string()))
  360. }
  361. Err(e) => {
  362. let message_str = format!("服务器响应:{:?}", e);
  363. trace!("{:?}",message_str);
  364. Option::from(ResponseData::new("".to_string(),
  365. "-1".to_string(),
  366. message_str, "".to_string()))
  367. }
  368. }
  369. }
  370. //发送数据
  371. pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
  372. let write_tx_clone = write_tx_clone.lock().await;
  373. write_tx_clone.unbounded_send(message.clone()).unwrap();
  374. trace!("发送指令:{:?}",message);
  375. true
  376. }
  377. }
  378. //创建链接
  379. pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
  380. SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
  381. //1.是否走代理
  382. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  383. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  384. ProxyResponseEnum::NO => {
  385. trace!("非 代理");
  386. None
  387. }
  388. ProxyResponseEnum::YES(proxy) => {
  389. trace!("代理");
  390. Option::from(proxy)
  391. }
  392. };
  393. let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
  394. trace!("WebSocket 握手完成。");
  395. ws_stream.split()
  396. }
  397. pub async fn client(add_url: String) {
  398. let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
  399. 127,
  400. 0,
  401. 0,
  402. 1)
  403. ), 7890);
  404. //创建通道 开启线程,向通道写入数据
  405. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  406. let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
  407. spawn(write_sell(write_tx));
  408. //创建socket,,并且读写分离
  409. let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
  410. trace!("WebSocket handshake has been successfully completed");
  411. let (write, read) = ws_stream.split();
  412. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  413. let stdin_to_ws = write_rx.map(Ok).forward(write);
  414. let ws_to_stdout = {
  415. trace!("---1");
  416. //读,循环读取,然后拿到 message,,然后开启异步处理 message,
  417. let result = read.for_each(|message| async {
  418. read_tx.unbounded_send(message.unwrap()).unwrap();
  419. });
  420. trace!("---3");
  421. result
  422. };
  423. tokio::spawn(read_sell(read_rx));
  424. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  425. pin_mut!(stdin_to_ws, ws_to_stdout);
  426. future::select(stdin_to_ws, ws_to_stdout).await;
  427. }
  428. //模拟 业务场景中 发送指令给目标交易所
  429. async fn write_sell(tx: futures_channel::mpsc::UnboundedSender<Message>) {
  430. let _str = serde_json::json!({
  431. "op": "subscribe",
  432. "args": [
  433. {
  434. // "channel":"orders",
  435. // "instType":"SWAP",
  436. // "instFamily":"BTC-USDT"
  437. "channel":"books5",
  438. "instId":"BTC-USDT"
  439. }
  440. ]
  441. });
  442. let str_array: Vec<String> = vec![
  443. // log_in_to_str(),
  444. // str.to_string(),
  445. ];
  446. let i = 0;
  447. loop {
  448. if str_array.len() > i {
  449. let send_str = str_array.get(i).unwrap();
  450. tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
  451. }
  452. tokio::time::sleep(Duration::from_secs(5)).await;
  453. tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
  454. tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
  455. }
  456. }
  457. async fn read_sell(mut rx: futures_channel::mpsc::UnboundedReceiver<Message>) {
  458. loop {
  459. if let Some(message) = rx.next().await {
  460. match message {
  461. Message::Text(s) => {
  462. trace!("Text: {}", s);
  463. }
  464. Message::Binary(s) => {
  465. trace!("Binary: {:?}", s);
  466. }
  467. Message::Ping(s) => {
  468. trace!("Ping: {:?}", s);
  469. }
  470. Message::Pong(s) => {
  471. trace!("Pong: {:?}", s);
  472. }
  473. Message::Close(s) => {
  474. trace!("Close: {:?}", s);
  475. }
  476. Message::Frame(s) => {
  477. trace!("Frame: {:?}", s);
  478. }
  479. }
  480. }
  481. tokio::time::sleep(Duration::from_millis(1)).await
  482. }
  483. }
  484. pub fn log_in_to_str() -> String {
  485. let mut login_json_str = "".to_string();
  486. let access_key: String = "".to_string();
  487. let secret_key: String = "".to_string();
  488. let passphrase: String = "".to_string();
  489. if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
  490. let timestamp = Utc::now().timestamp().to_string();
  491. // 时间戳 + 请求类型+ 请求参数字符串
  492. let message = format!("{}GET{}", timestamp, "/users/self/verify");
  493. let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  494. let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
  495. let sign = base64::encode(result);
  496. let login_json = json!({
  497. "op": "login",
  498. "args": [{
  499. "apiKey": access_key,
  500. "passphrase": passphrase,
  501. "timestamp": timestamp,
  502. "sign": sign }]
  503. });
  504. // trace!("---login_json:{0}", login_json.to_string());
  505. // trace!("--登陆:{}", login_json.to_string());
  506. login_json_str = login_json.to_string();
  507. }
  508. login_json_str
  509. }