socket_tool.rs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. use std::net::{IpAddr, Ipv4Addr, SocketAddr};
  2. use std::sync::Arc;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::time::Duration;
  5. use chrono::Utc;
  6. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  7. use futures_util::{future, pin_mut, SinkExt, StreamExt};
  8. use futures_util::stream::{SplitSink, SplitStream};
  9. use ring::hmac;
  10. use serde_json::json;
  11. use tokio::net::TcpStream;
  12. use tokio::{spawn, time};
  13. use tokio::sync::Mutex;
  14. use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
  15. use tokio_tungstenite::tungstenite::{Error, Message};
  16. use tracing::{trace};
  17. use crate::proxy;
  18. use crate::proxy::{ProxyEnum, ProxyResponseEnum};
  19. use crate::response_base::ResponseData;
  20. #[derive(Debug)]
  21. pub enum HeartbeatType {
  22. Ping,
  23. Pong,
  24. Custom(String),
  25. }
  26. pub struct AbstractWsMode {}
  27. impl AbstractWsMode {
  28. //创建链接
  29. pub async fn ws_connect_async<T, PI, PO>(bool_v1: Arc<AtomicBool>,
  30. address_url: String,
  31. lable: String,
  32. subscribe_array: Vec<String>,
  33. mut write_rx: UnboundedReceiver<Message>,
  34. read_tx: UnboundedSender<ResponseData>,
  35. message_text: T,
  36. message_ping: PI,
  37. message_pong: PO) -> Result<(), Error>
  38. where T: Fn(String) -> Option<ResponseData> + Copy,
  39. PI: Fn(Vec<u8>) -> Option<ResponseData> + Copy,
  40. PO: Fn(Vec<u8>) -> Option<ResponseData> + Copy
  41. {
  42. //1.是否走代理
  43. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  44. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  45. ProxyResponseEnum::NO => {
  46. // trace!("非 代理");
  47. None
  48. }
  49. ProxyResponseEnum::YES(proxy) => {
  50. // trace!("代理");
  51. Option::from(proxy)
  52. }
  53. };
  54. let read_arc = Arc::new(Mutex::new(read_tx));
  55. let read1 = read_arc.clone();
  56. loop {
  57. match connect_async(address_url.clone(), proxy).await {
  58. Ok((ws_stream, _)) => {
  59. trace!("WebSocket 握手完成。");
  60. let (write, mut read) = ws_stream.split();
  61. let write_arc = Arc::new(Mutex::new(write));
  62. let write_clone = Arc::clone(&write_arc);
  63. //订阅写入(包括订阅信息 )
  64. trace!("订阅内容:{:?}",subscribe_array.clone());
  65. for s in &subscribe_array {
  66. let mut write_lock = write_clone.lock().await;
  67. write_lock.send(Message::Text(s.parse().unwrap())).await?;
  68. }
  69. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  70. // let stdin_to_ws = write_rx.map(Ok).forward(write);
  71. // Writing task
  72. let write_clone2 = Arc::clone(&write_arc);
  73. let stdin_to_ws = async {
  74. while let Some(message) = write_rx.next().await {
  75. let mut write_lock2 = write_clone2.lock().await;
  76. write_lock2.send(message).await?;
  77. }
  78. Ok::<(), Error>(())
  79. };
  80. let write_clone3 = Arc::clone(&write_arc);
  81. let ws_to_stdout = async {
  82. while let Some(message) = read.next().await {
  83. if !bool_v1.load(Ordering::Relaxed) {
  84. continue;
  85. }
  86. let mut write_lock3 = write_clone3.lock().await;
  87. let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
  88. // let response_data = func(message);
  89. if response_data.is_some() {
  90. let data = response_data.unwrap();
  91. if data.code == "200" {
  92. let mut data_c = data.clone();
  93. data_c.time = chrono::Utc::now().timestamp_micros();
  94. data_c.label = lable.clone();
  95. // if data_c.label.contains("gate_usdt_swap") {
  96. // if data_c.channel == "futures.order_book" {
  97. // if read_tx.len() == 0 {
  98. // read_tx.unbounded_send(data_c).unwrap();
  99. // }
  100. // } else {
  101. // read_tx.unbounded_send(data_c).unwrap();
  102. // }
  103. // } else if data_c.label.contains("binance_usdt_swap") {
  104. // if data_c.channel == "bookTicker" {
  105. // if read_tx.len() == 0 {
  106. // read_tx.unbounded_send(data_c).unwrap();
  107. // }
  108. // } else {
  109. // read_tx.unbounded_send(data_c).unwrap();
  110. // }
  111. // } else if data_c.label.contains("bybit_usdt_swap") {
  112. // if data_c.channel == "orderbook" {
  113. // if read_tx.len() == 0 {
  114. // read_tx.unbounded_send(data_c).unwrap();
  115. // }
  116. // } else {
  117. // read_tx.unbounded_send(data_c).unwrap();
  118. // }
  119. // } else {
  120. // if read_tx.len() == 0 {
  121. let r = read1.clone();
  122. spawn(async move {
  123. r.lock().await.unbounded_send(data_c).unwrap();
  124. });
  125. // }
  126. // }
  127. }
  128. let code = data.code.clone();
  129. /*
  130. 200 -正确返回
  131. -200 -登录成功
  132. -201 -订阅成功
  133. -300 -客户端收到服务器心跳ping,需要响应
  134. -301 -客户端收到服务器心跳pong,需要响应
  135. -302 -客户端收到服务器心跳自定义,需要响应自定义
  136. */
  137. match code.as_str() {
  138. "200" => {
  139. }
  140. "-200" => {
  141. //登录成功
  142. trace!("登录成功:{:?}", data);
  143. }
  144. "-201" => {
  145. //订阅成功
  146. trace!("订阅成功:{:?}", data);
  147. }
  148. "-300" => {
  149. //服务器发送心跳 ping 给客户端,客户端需要pong回应
  150. trace!("服务器响应-ping");
  151. if data.data.len() > 0 {
  152. write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
  153. trace!("客户端回应服务器-pong");
  154. }
  155. }
  156. "-301" => {
  157. //服务器发送心跳 pong 给客户端,客户端需要ping回应
  158. trace!("服务器响应-pong");
  159. if data.data.len() > 0 {
  160. write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
  161. trace!("客户端回应服务器-ping");
  162. }
  163. }
  164. "-302" => {
  165. //客户端收到服务器心跳自定义,需要响应自定义
  166. trace!("特定字符心跳,特殊响应:{:?}", data);
  167. write_lock3.send(Message::Text(data.data)).await?;
  168. trace!("特殊字符心跳-回应完成");
  169. }
  170. _ => {
  171. trace!("未知:{:?}",data);
  172. }
  173. }
  174. }
  175. }
  176. Ok::<(), Error>(())
  177. };
  178. // 防止 cpu 休眠。
  179. let read2 = read_arc.clone();
  180. spawn(async move {
  181. let t_str = "t".to_string();
  182. let response = ResponseData::new(t_str.clone(),
  183. t_str.clone(),
  184. t_str.clone(),
  185. t_str.clone());
  186. loop {
  187. time::sleep(Duration::from_micros(5)).await;
  188. let t = response.clone();
  189. let r = read2.clone();
  190. spawn(async move {
  191. let read = r.lock().await;
  192. if read.is_empty() {
  193. read.unbounded_send(t.clone()).unwrap();
  194. }
  195. });
  196. }
  197. });
  198. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  199. pin_mut!(stdin_to_ws, ws_to_stdout,);
  200. future::select(stdin_to_ws, ws_to_stdout).await;
  201. }
  202. Err(e) => {
  203. trace!("WebSocket 握手失败:{:?}",e);
  204. }
  205. }
  206. trace!("---5");
  207. trace!("---4");
  208. trace!("重启...");
  209. // let (ws_stream, _) = connect_async(address_url.clone(), proxy).await.unwrap();
  210. // trace!("WebSocket 握手完成。");
  211. // let (write, mut read) = ws_stream.split();
  212. //
  213. // let write_arc = Arc::new(Mutex::new(write));
  214. // let write_clone = Arc::clone(&write_arc);
  215. // //订阅写入(包括订阅信息 )
  216. // trace!("订阅内容:{:?}",subscribe_array.clone());
  217. // for s in &subscribe_array {
  218. // let mut write_lock = write_clone.lock().await;
  219. // write_lock.send(Message::Text(s.parse().unwrap())).await?;
  220. // }
  221. //
  222. // //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  223. // // let stdin_to_ws = write_rx.map(Ok).forward(write);
  224. // // Writing task
  225. // let write_clone2 = Arc::clone(&write_arc);
  226. // let stdin_to_ws = async {
  227. // while let Some(message) = write_rx.next().await {
  228. // let mut write_lock2 = write_clone2.lock().await;
  229. // write_lock2.send(message).await?;
  230. // }
  231. // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  232. // };
  233. // let write_clone3 = Arc::clone(&write_arc);
  234. // let ws_to_stdout = async {
  235. // while let Some(message) = read.next().await {
  236. // let mut write_lock3 = write_clone3.lock().await;
  237. // let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong);
  238. // // let response_data = func(message);
  239. // if response_data.is_some() {
  240. // let mut data = response_data.unwrap();
  241. // data.label = lable.clone();
  242. // let code = data.code.clone();
  243. // /*
  244. // 200 -正确返回
  245. // -200 -登录成功
  246. // -201 -订阅成功
  247. // -300 -客户端收到服务器心跳ping,需要响应
  248. // -301 -客户端收到服务器心跳pong,需要响应
  249. // -302 -客户端收到服务器心跳自定义,需要响应自定义
  250. // */
  251. // match code.as_str() {
  252. // "200" => {
  253. // if bool_v1.load(Ordering::Relaxed) {
  254. // read_tx.unbounded_send(data).unwrap();
  255. // }
  256. // }
  257. // "-200" => {
  258. // //登录成功
  259. // trace!("登录成功:{:?}", data);
  260. // }
  261. // "-201" => {
  262. // //订阅成功
  263. // trace!("订阅成功:{:?}", data);
  264. // }
  265. // "-300" => {
  266. // //服务器发送心跳 ping 给客户端,客户端需要pong回应
  267. // trace!("服务器响应-ping");
  268. // if data.data.len() > 0 {
  269. // write_lock3.send(Message::Pong(Vec::from(data.data))).await?;
  270. // trace!("客户端回应服务器-pong");
  271. // }
  272. // }
  273. // "-301" => {
  274. // //服务器发送心跳 pong 给客户端,客户端需要ping回应
  275. // trace!("服务器响应-pong");
  276. // if data.data.len() > 0 {
  277. // write_lock3.send(Message::Ping(Vec::from(data.data))).await?;
  278. // trace!("客户端回应服务器-ping");
  279. // }
  280. // }
  281. // "-302" => {
  282. // //客户端收到服务器心跳自定义,需要响应自定义
  283. // trace!("特定字符心跳,特殊响应:{:?}", data);
  284. // write_lock3.send(Message::Text(data.data)).await?;
  285. // trace!("特殊字符心跳-回应完成");
  286. // }
  287. // _ => {
  288. // trace!("未知:{:?}",data);
  289. // }
  290. // }
  291. // }
  292. // }
  293. // Ok::<(), tokio_tungstenite::tungstenite::Error>(())
  294. // };
  295. //
  296. // //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  297. // pin_mut!(stdin_to_ws, ws_to_stdout,);
  298. // future::select(stdin_to_ws, ws_to_stdout).await;
  299. // trace!("---5");
  300. // trace!("---4");
  301. // trace!("重启...");
  302. }
  303. // return Ok(());
  304. }
  305. //心跳包
  306. pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
  307. loop {
  308. tokio::time::sleep(Duration::from_millis(millis)).await;
  309. let write_tx_clone = write_tx_clone.lock().await;
  310. write_tx_clone.unbounded_send(
  311. match h_type {
  312. HeartbeatType::Ping => {
  313. Message::Ping(Vec::from("Ping"))
  314. }
  315. HeartbeatType::Pong => {
  316. Message::Pong(Vec::from("Pong"))
  317. }
  318. HeartbeatType::Custom(ref str) => {
  319. Message::Text(str.parse().unwrap())
  320. }
  321. }
  322. ).expect("发送失败");
  323. trace!("发送指令-心跳:{:?}",h_type);
  324. }
  325. }
  326. //数据解析
  327. pub fn analysis_message<T, PI, PO>(message: Result<Message, Error>,
  328. message_text: T,
  329. message_ping: PI,
  330. message_pong: PO) -> Option<ResponseData>
  331. where T: Fn(String) -> Option<ResponseData>,
  332. PI: Fn(Vec<u8>) -> Option<ResponseData>,
  333. PO: Fn(Vec<u8>) -> Option<ResponseData>
  334. {
  335. match message {
  336. Ok(Message::Text(text)) => message_text(text),
  337. Ok(Message::Ping(pi)) => message_ping(pi),
  338. Ok(Message::Pong(po)) => message_pong(po),
  339. Ok(Message::Binary(s)) => {
  340. //二进制WebSocket消息
  341. let message_str = format!("Binary:{:?}", s);
  342. trace!("{:?}",message_str);
  343. Option::from(ResponseData::new("".to_string(),
  344. "2".to_string(),
  345. message_str, "".to_string()))
  346. }
  347. Ok(Message::Close(c)) => {
  348. let message_str = format!("关闭指令:{:?}", c);
  349. trace!("{:?}",message_str);
  350. Option::from(ResponseData::new("".to_string(),
  351. "0".to_string(),
  352. message_str, "".to_string()))
  353. }
  354. Ok(Message::Frame(f)) => {
  355. //原始帧 正常读取数据不会读取到该 信息类型
  356. let message_str = format!("意外读取到原始帧:{:?}", f);
  357. trace!("{:?}",message_str);
  358. Option::from(ResponseData::new("".to_string(),
  359. "-2".to_string(),
  360. message_str, "".to_string()))
  361. }
  362. Err(e) => {
  363. let message_str = format!("服务器响应:{:?}", e);
  364. trace!("{:?}",message_str);
  365. Option::from(ResponseData::new("".to_string(),
  366. "-1".to_string(),
  367. message_str, "".to_string()))
  368. }
  369. }
  370. }
  371. //发送数据
  372. pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
  373. let write_tx_clone = write_tx_clone.lock().await;
  374. write_tx_clone.unbounded_send(message.clone()).unwrap();
  375. trace!("发送指令:{:?}",message);
  376. true
  377. }
  378. }
  379. //创建链接
  380. pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
  381. SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
  382. //1.是否走代理
  383. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  384. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  385. ProxyResponseEnum::NO => {
  386. trace!("非 代理");
  387. None
  388. }
  389. ProxyResponseEnum::YES(proxy) => {
  390. trace!("代理");
  391. Option::from(proxy)
  392. }
  393. };
  394. let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
  395. trace!("WebSocket 握手完成。");
  396. ws_stream.split()
  397. }
  398. pub async fn client(add_url: String) {
  399. let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
  400. 127,
  401. 0,
  402. 0,
  403. 1)
  404. ), 7890);
  405. //创建通道 开启线程,向通道写入数据
  406. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  407. let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
  408. spawn(write_sell(write_tx));
  409. //创建socket,,并且读写分离
  410. let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
  411. trace!("WebSocket handshake has been successfully completed");
  412. let (write, read) = ws_stream.split();
  413. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  414. let stdin_to_ws = write_rx.map(Ok).forward(write);
  415. let ws_to_stdout = {
  416. trace!("---1");
  417. //读,循环读取,然后拿到 message,,然后开启异步处理 message,
  418. let result = read.for_each(|message| async {
  419. read_tx.unbounded_send(message.unwrap()).unwrap();
  420. });
  421. trace!("---3");
  422. result
  423. };
  424. tokio::spawn(read_sell(read_rx));
  425. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  426. pin_mut!(stdin_to_ws, ws_to_stdout);
  427. future::select(stdin_to_ws, ws_to_stdout).await;
  428. }
  429. //模拟 业务场景中 发送指令给目标交易所
  430. async fn write_sell(tx: futures_channel::mpsc::UnboundedSender<Message>) {
  431. let _str = serde_json::json!({
  432. "op": "subscribe",
  433. "args": [
  434. {
  435. // "channel":"orders",
  436. // "instType":"SWAP",
  437. // "instFamily":"BTC-USDT"
  438. "channel":"books5",
  439. "instId":"BTC-USDT"
  440. }
  441. ]
  442. });
  443. let str_array: Vec<String> = vec![
  444. // log_in_to_str(),
  445. // str.to_string(),
  446. ];
  447. let i = 0;
  448. loop {
  449. if str_array.len() > i {
  450. let send_str = str_array.get(i).unwrap();
  451. tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
  452. }
  453. tokio::time::sleep(Duration::from_secs(5)).await;
  454. tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
  455. tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
  456. }
  457. }
  458. async fn read_sell(mut rx: futures_channel::mpsc::UnboundedReceiver<Message>) {
  459. loop {
  460. if let Some(message) = rx.next().await {
  461. match message {
  462. Message::Text(s) => {
  463. trace!("Text: {}", s);
  464. }
  465. Message::Binary(s) => {
  466. trace!("Binary: {:?}", s);
  467. }
  468. Message::Ping(s) => {
  469. trace!("Ping: {:?}", s);
  470. }
  471. Message::Pong(s) => {
  472. trace!("Pong: {:?}", s);
  473. }
  474. Message::Close(s) => {
  475. trace!("Close: {:?}", s);
  476. }
  477. Message::Frame(s) => {
  478. trace!("Frame: {:?}", s);
  479. }
  480. }
  481. }
  482. tokio::time::sleep(Duration::from_millis(1)).await
  483. }
  484. }
  485. pub fn log_in_to_str() -> String {
  486. let mut login_json_str = "".to_string();
  487. let access_key: String = "".to_string();
  488. let secret_key: String = "".to_string();
  489. let passphrase: String = "".to_string();
  490. if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
  491. let timestamp = Utc::now().timestamp().to_string();
  492. // 时间戳 + 请求类型+ 请求参数字符串
  493. let message = format!("{}GET{}", timestamp, "/users/self/verify");
  494. let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  495. let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
  496. let sign = base64::encode(result);
  497. let login_json = json!({
  498. "op": "login",
  499. "args": [{
  500. "apiKey": access_key,
  501. "passphrase": passphrase,
  502. "timestamp": timestamp,
  503. "sign": sign }]
  504. });
  505. // trace!("---login_json:{0}", login_json.to_string());
  506. // trace!("--登陆:{}", login_json.to_string());
  507. login_json_str = login_json.to_string();
  508. }
  509. login_json_str
  510. }