socket_tool.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. use std::net::{IpAddr, Ipv4Addr, SocketAddr};
  2. use std::sync::Arc;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::time::Duration;
  5. use chrono::Utc;
  6. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  7. use futures_util::{future, pin_mut, SinkExt, StreamExt};
  8. use futures_util::stream::{SplitSink, SplitStream};
  9. use ring::hmac;
  10. use serde_json::{json, Value};
  11. use tokio::net::TcpStream;
  12. use tokio::sync::Mutex;
  13. use tokio::time::Instant;
  14. use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
  15. use tokio_tungstenite::tungstenite::{Error, Message};
  16. use tracing::{error, info, trace, warn};
  17. use crate::exchange::proxy;
  18. use crate::exchange::proxy::{ProxyEnum, ProxyResponseEnum};
  19. use crate::exchange::response_base::Response;
  20. #[derive(Debug)]
  21. pub enum HeartbeatType {
  22. Ping,
  23. Pong,
  24. Custom(String),
  25. }
  26. pub struct AbstractWsMode {}
  27. impl AbstractWsMode {
  28. pub async fn ws_connected<T, PI, PO, F, B, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
  29. is_first_login: bool,
  30. label: String,
  31. is_shutdown_arc: Arc<AtomicBool>,
  32. handle_function: &F,
  33. subscribe_array: Vec<String>,
  34. ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
  35. message_text: T,
  36. message_ping: PI,
  37. message_pong: PO,
  38. message_binary: B)
  39. where T: Fn(String) -> Option<Response> + Copy,
  40. PI: Fn(Vec<u8>) -> Option<Response> + Copy,
  41. PO: Fn(Vec<u8>) -> Option<Response> + Copy,
  42. F: Fn(Response) -> Future + Clone,
  43. B: Fn(Vec<u8>) -> Option<Response> + Copy,
  44. Future: future::Future<Output=()> + Send + 'static,
  45. {
  46. let (ws_write, mut ws_read) = ws_stream.split();
  47. let ws_write_arc = Arc::new(Mutex::new(ws_write));
  48. // 将socket 的写操作与【写通道(外部向socket写)】链接起来,将数据以ok的结构体封装进行传递
  49. // 这里是形成链式操作,如果要将外界的信息传进来(使用socket查单、下单之类的,部分交易所可以支持),就要这要弄
  50. let mut write_to_socket_rx = write_to_socket_rx_arc.lock().await;
  51. let ws_write_channel_clone = Arc::clone(&ws_write_arc);
  52. let stdin_to_ws = async {
  53. while let Some(message) = write_to_socket_rx.next().await {
  54. let mut write_lock2 = ws_write_channel_clone.lock().await;
  55. write_lock2.send(message).await?;
  56. }
  57. Ok::<(), Error>(())
  58. };
  59. // 如果不需要事先登录,则直接订阅消息
  60. if !is_first_login {
  61. trace!("不需要先登录,订阅内容:");
  62. for s in &subscribe_array {
  63. trace!("{}", s);
  64. let mut write_lock = ws_write_arc.lock().await;
  65. write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
  66. }
  67. }
  68. let ws_write_inner = Arc::clone(&ws_write_arc);
  69. let ws_to_stdout = async {
  70. while let Some(message) = ws_read.next().await {
  71. if !is_shutdown_arc.load(Ordering::Relaxed) {
  72. continue;
  73. }
  74. let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong, message_binary);
  75. // let response_data = func(message);
  76. if response_data.is_some() {
  77. let mut data = response_data.unwrap();
  78. data.label = label.clone();
  79. let code = data.code.clone();
  80. if code == 200 {
  81. let mut data_c = data.clone();
  82. data_c.ins = Instant::now();
  83. data_c.time = Utc::now().timestamp_millis();
  84. handle_function(data_c).await;
  85. }
  86. /*
  87. 200 -正确返回
  88. -200 -登录成功
  89. -201 -订阅成功
  90. -300 -客户端收到服务器心跳ping,需要响应
  91. -301 -客户端收到服务器心跳pong,需要响应
  92. -302 -客户端收到服务器心跳自定义,需要响应自定义
  93. */
  94. match code {
  95. 200 => {
  96. let mut data_c = data.clone();
  97. data_c.ins = Instant::now();
  98. data_c.time = Utc::now().timestamp_millis();
  99. handle_function(data_c).await;
  100. }
  101. -200 => {
  102. //登录成功
  103. info!("ws登录成功:{:?}", data);
  104. if is_first_login {
  105. for s in &subscribe_array {
  106. info!("订阅内容:{}", s);
  107. let mut write_lock = ws_write_arc.lock().await;
  108. write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
  109. }
  110. info!("订阅完成!");
  111. }
  112. }
  113. -201 => {
  114. // 订阅成功
  115. // trace!("订阅成功:{:?}", data);
  116. }
  117. -300 => {
  118. //服务器发送心跳 ping 给客户端,客户端需要pong回应
  119. trace!("服务器响应-ping");
  120. if data.data != Value::Null {
  121. let mut ws_write = ws_write_inner.lock().await;
  122. ws_write.send(Message::Pong(Vec::from(data.data.to_string()))).await?;
  123. trace!("客户端回应服务器-pong");
  124. }
  125. }
  126. -301 => {
  127. //服务器发送心跳 pong 给客户端,客户端需要ping回应
  128. trace!("服务器响应-pong");
  129. if data.data != Value::Null {
  130. let mut ws_write = ws_write_inner.lock().await;
  131. ws_write.send(Message::Ping(Vec::from(data.data.to_string()))).await?;
  132. trace!("客户端回应服务器-ping");
  133. }
  134. }
  135. -302 => {
  136. //客户端收到服务器心跳自定义,需要响应自定义
  137. trace!("特定字符心跳,特殊响应:{:?}", data);
  138. let mut ws_write = ws_write_inner.lock().await;
  139. ws_write.send(Message::Text(data.data.to_string())).await?;
  140. trace!("特殊字符心跳-回应完成");
  141. }
  142. _ => {
  143. error!("未知:{:?}", data);
  144. }
  145. }
  146. }
  147. }
  148. Ok::<(), Error>(())
  149. };
  150. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  151. pin_mut!(stdin_to_ws, ws_to_stdout,);
  152. future::select(stdin_to_ws, ws_to_stdout).await;
  153. }
  154. //创建链接
  155. pub async fn ws_connect_async<T, PI, PO, F, B, Future>(is_shutdown_arc: Arc<AtomicBool>,
  156. handle_function: F,
  157. address_url: String,
  158. is_first_login: bool,
  159. label: String,
  160. subscribe_array: Vec<String>,
  161. write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
  162. message_text: T,
  163. message_ping: PI,
  164. message_pong: PO,
  165. message_binary: B)
  166. where T: Fn(String) -> Option<Response> + Copy,
  167. PI: Fn(Vec<u8>) -> Option<Response> + Copy,
  168. PO: Fn(Vec<u8>) -> Option<Response> + Copy,
  169. B: Fn(Vec<u8>) -> Option<Response> + Copy,
  170. F: Fn(Response) -> Future + Clone,
  171. Future: future::Future<Output=()> + Send + 'static,
  172. {
  173. //1.是否走代理
  174. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  175. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  176. ProxyResponseEnum::NO => {
  177. // trace!("非 代理");
  178. None
  179. }
  180. ProxyResponseEnum::YES(proxy) => {
  181. // trace!("代理");
  182. Option::from(proxy)
  183. }
  184. };
  185. match connect_async(address_url.clone(), proxy).await {
  186. Ok((ws_stream, _)) => {
  187. trace!("socket 链接成功,{}。", address_url);
  188. Self::ws_connected(write_to_socket_rx_arc,
  189. is_first_login,
  190. label,
  191. is_shutdown_arc,
  192. &handle_function,
  193. subscribe_array.clone(),
  194. ws_stream,
  195. message_text,
  196. message_ping,
  197. message_pong,
  198. message_binary).await;
  199. }
  200. Err(e) => {
  201. warn!("WebSocket 握手失败:{:?}", e);
  202. }
  203. }
  204. }
  205. //心跳包
  206. pub async fn ping_or_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
  207. loop {
  208. tokio::time::sleep(Duration::from_millis(millis)).await;
  209. let write_tx_clone = write_tx_clone.lock().await;
  210. match write_tx_clone.unbounded_send(
  211. match h_type {
  212. HeartbeatType::Ping => {
  213. Message::Ping(Vec::from("Ping"))
  214. }
  215. HeartbeatType::Pong => {
  216. Message::Pong(Vec::from("Pong"))
  217. }
  218. HeartbeatType::Custom(ref str) => {
  219. Message::Text(str.parse().unwrap())
  220. }
  221. }
  222. ) {
  223. Ok(_o) => {
  224. trace!("发送指令-心跳:{:?}",h_type);
  225. }
  226. Err(k) => {
  227. error!("发送失败:原因{:?}",k)
  228. }
  229. }
  230. // write_tx_clone.unbounded_send(
  231. // match h_type {
  232. // HeartbeatType::Ping => {
  233. // Message::Ping(Vec::from("Ping"))
  234. // }
  235. // HeartbeatType::Pong => {
  236. // Message::Pong(Vec::from("Pong"))
  237. // }
  238. // HeartbeatType::Custom(ref str) => {
  239. // Message::Text(str.parse().unwrap())
  240. // }
  241. // }
  242. // ).expect("发送失败");
  243. }
  244. }
  245. //数据解析
  246. pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
  247. message_text: T,
  248. message_ping: PI,
  249. message_pong: PO,
  250. message_binary: B) -> Option<Response>
  251. where T: Fn(String) -> Option<Response>,
  252. PI: Fn(Vec<u8>) -> Option<Response>,
  253. PO: Fn(Vec<u8>) -> Option<Response>,
  254. B: Fn(Vec<u8>) -> Option<Response>
  255. {
  256. match message {
  257. Ok(Message::Text(text)) => message_text(text),
  258. Ok(Message::Ping(pi)) => message_ping(pi),
  259. Ok(Message::Pong(po)) => message_pong(po),
  260. Ok(Message::Binary(s)) => message_binary(s), //二进制WebSocket消息
  261. Ok(Message::Close(c)) => {
  262. let message_str = format!("关闭指令:{:?}", c);
  263. trace!("{:?}",message_str);
  264. Option::from(Response::new("".to_string(), 0, message_str, Value::Null))
  265. }
  266. Ok(Message::Frame(f)) => {
  267. //原始帧 正常读取数据不会读取到该 信息类型
  268. let message_str = format!("意外读取到原始帧:{:?}", f);
  269. trace!("{:?}",message_str);
  270. Option::from(Response::new("".to_string(), -2, message_str, Value::Null))
  271. }
  272. Err(e) => {
  273. let message_str = format!("服务器响应:{:?}", e);
  274. trace!("{:?}",message_str);
  275. Option::from(Response::new("".to_string(), -1, message_str, Value::Null))
  276. }
  277. }
  278. }
  279. //发送数据
  280. pub async fn send_subscribe(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, message: Message) -> bool {
  281. let write_tx_clone = write_tx_clone.lock().await;
  282. write_tx_clone.unbounded_send(message.clone()).unwrap();
  283. trace!("发送指令:{:?}",message);
  284. true
  285. }
  286. }
  287. //创建链接
  288. pub async fn ws_connect_async(address_url: String) -> (SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
  289. SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>) {
  290. //1.是否走代理
  291. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  292. let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
  293. ProxyResponseEnum::NO => {
  294. trace!("非代理");
  295. None
  296. }
  297. ProxyResponseEnum::YES(proxy) => {
  298. trace!("代理");
  299. Option::from(proxy)
  300. }
  301. };
  302. let (ws_stream, _) = connect_async(address_url, proxy).await.expect("链接失败!");
  303. trace!("WebSocket 握手完成。");
  304. ws_stream.split()
  305. }
  306. pub async fn client(add_url: String) {
  307. let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
  308. 127,
  309. 0,
  310. 0,
  311. 1)
  312. ), 7890);
  313. //创建通道 开启线程,向通道写入数据
  314. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  315. let (read_tx, read_rx) = futures_channel::mpsc::unbounded();
  316. tokio::spawn(write_sell(write_tx));
  317. //创建socket,并且读写分离
  318. let (ws_stream, _) = connect_async(add_url, Option::from(proxy)).await.expect("Failed to connect");
  319. trace!("WebSocket handshake has been successfully completed");
  320. let (write, read) = ws_stream.split();
  321. //将socket 的写操作与 写通道链接起来,将数据以ok的结构体封装进行传递
  322. let stdin_to_ws = write_rx.map(Ok).forward(write);
  323. let ws_to_stdout = {
  324. trace!("---1");
  325. //读,循环读取,然后拿到 message,,然后开启异步处理 message,
  326. let result = read.for_each(|message| async {
  327. read_tx.unbounded_send(message.unwrap()).unwrap();
  328. });
  329. trace!("---3");
  330. result
  331. };
  332. tokio::spawn(read_sell(read_rx));
  333. //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
  334. pin_mut!(stdin_to_ws, ws_to_stdout);
  335. future::select(stdin_to_ws, ws_to_stdout).await;
  336. }
  337. //模拟 业务场景中 发送指令给目标交易所
  338. async fn write_sell(tx: UnboundedSender<Message>) {
  339. let _str = json!({
  340. "op": "subscribe",
  341. "args": [
  342. {
  343. // "channel":"orders",
  344. // "instType":"SWAP",
  345. // "instFamily":"BTC-USDT"
  346. "channel":"books5",
  347. "instId":"BTC-USDT"
  348. }
  349. ]
  350. });
  351. let str_array: Vec<String> = vec![
  352. // log_in_to_str(),
  353. // str.to_string(),
  354. ];
  355. let i = 0;
  356. loop {
  357. if str_array.len() > i {
  358. let send_str = str_array.get(i).unwrap();
  359. tx.unbounded_send(Message::Text(send_str.to_string())).unwrap();
  360. }
  361. tokio::time::sleep(Duration::from_secs(5)).await;
  362. tx.unbounded_send(Message::Ping(Vec::from("Ping"))).unwrap();
  363. tx.unbounded_send(Message::Ping(Vec::from("Pong"))).unwrap();
  364. }
  365. }
  366. async fn read_sell(mut rx: UnboundedReceiver<Message>) {
  367. loop {
  368. if let Some(message) = rx.next().await {
  369. match message {
  370. Message::Text(s) => {
  371. trace!("Text: {}", s);
  372. }
  373. Message::Binary(s) => {
  374. trace!("Binary: {:?}", s);
  375. }
  376. Message::Ping(s) => {
  377. trace!("Ping: {:?}", s);
  378. }
  379. Message::Pong(s) => {
  380. trace!("Pong: {:?}", s);
  381. }
  382. Message::Close(s) => {
  383. trace!("Close: {:?}", s);
  384. }
  385. Message::Frame(s) => {
  386. trace!("Frame: {:?}", s);
  387. }
  388. }
  389. }
  390. tokio::time::sleep(Duration::from_millis(1)).await
  391. }
  392. }
  393. pub fn log_in_to_str() -> String {
  394. let mut login_json_str = "".to_string();
  395. let access_key: String = "".to_string();
  396. let secret_key: String = "".to_string();
  397. let passphrase: String = "".to_string();
  398. if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
  399. let timestamp = Utc::now().timestamp().to_string();
  400. // 时间戳 + 请求类型+ 请求参数字符串
  401. let message = format!("{}GET{}", timestamp, "/users/self/verify");
  402. let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  403. let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
  404. let sign = base64::encode(result);
  405. let login_json = json!({
  406. "op": "login",
  407. "args": [{
  408. "apiKey": access_key,
  409. "passphrase": passphrase,
  410. "timestamp": timestamp,
  411. "sign": sign
  412. }]
  413. });
  414. // trace!("---login_json:{0}", login_json.to_string());
  415. // trace!("--登录:{}", login_json.to_string());
  416. login_json_str = login_json.to_string();
  417. }
  418. login_json_str
  419. }