bingx_swap_ws.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. use std::io::Read;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use std::time::Duration;
  5. use flate2::read::GzDecoder;
  6. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  7. use serde_json::json;
  8. use serde_json::Value;
  9. use tokio::sync::Mutex;
  10. use tokio_tungstenite::tungstenite::{Error, Message};
  11. use tracing::{error, info, trace};
  12. use crate::response_base::ResponseData;
  13. use crate::socket_tool::AbstractWsMode;
  14. //类型
  15. pub enum BingxSwapWsType {
  16. PublicAndPrivate,
  17. }
  18. #[derive(Debug)]
  19. #[derive(Clone)]
  20. pub struct BingxSwapWsParam {
  21. pub token: String,
  22. pub ws_url: String,
  23. pub ws_ping_interval: i64,
  24. pub ws_ping_timeout: i64,
  25. pub is_ok_subscribe: bool,
  26. }
  27. //订阅频道
  28. #[derive(Clone)]
  29. pub enum BingxSwapSubscribeType {
  30. // 深度
  31. PuFuturesDepth,
  32. // 公开成交
  33. PuFuturesTrades,
  34. // K线数据
  35. PuFuturesRecords,
  36. }
  37. //账号信息
  38. #[derive(Clone, Debug)]
  39. pub struct BingxSwapLogin {
  40. pub access_key: String,
  41. pub secret_key: String,
  42. pub pass_key: String,
  43. }
  44. #[derive(Clone)]
  45. #[allow(dead_code)]
  46. pub struct BingxSwapWs {
  47. //类型
  48. tag: String,
  49. //地址
  50. address_url: String,
  51. //账号
  52. login_param: Option<BingxSwapLogin>,
  53. //登录数据
  54. ws_param: BingxSwapWsParam,
  55. //币对
  56. symbol_s: Vec<String>,
  57. //订阅
  58. subscribe_types: Vec<BingxSwapSubscribeType>,
  59. //心跳间隔
  60. heartbeat_time: u64,
  61. }
  62. impl BingxSwapWs {
  63. /*******************************************************************************************************/
  64. /*****************************************获取一个对象****************************************************/
  65. /*******************************************************************************************************/
  66. pub fn new(is_colo: bool, login_param: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
  67. return Self::new_with_tag("default-BingxSwapWs".to_string(), is_colo, login_param, ws_type);
  68. }
  69. pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
  70. /*******公共频道-私有频道数据组装*/
  71. let address_url = match ws_type {
  72. BingxSwapWsType::PublicAndPrivate => {
  73. let url = "wss://open-api-swap.bingx.com/swap-market".to_string();
  74. info!("走普通通道(不支持colo通道):{}", url);
  75. url
  76. }
  77. };
  78. /*******公共频道-私有频道数据组装*/
  79. let ws_param = BingxSwapWsParam {
  80. token: "".to_string(),
  81. ws_url: "".to_string(),
  82. ws_ping_interval: 0,
  83. ws_ping_timeout: 0,
  84. is_ok_subscribe: false,
  85. };
  86. if is_colo {
  87. info!("开启高速(未配置,走普通:{})通道",address_url);
  88. } else {
  89. info!("走普通通道:{}",address_url);
  90. }
  91. BingxSwapWs {
  92. tag,
  93. address_url,
  94. login_param,
  95. ws_param,
  96. symbol_s: vec![],
  97. subscribe_types: vec![],
  98. heartbeat_time: 1000 * 18,
  99. }
  100. }
  101. /*******************************************************************************************************/
  102. /*****************************************订阅函数********************************************************/
  103. /*******************************************************************************************************/
  104. //手动添加订阅信息
  105. pub fn set_subscribe(&mut self, subscribe_types: Vec<BingxSwapSubscribeType>) {
  106. self.subscribe_types.extend(subscribe_types);
  107. }
  108. //手动添加币对
  109. pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
  110. for symbol in b_array.iter_mut() {
  111. // 大写
  112. *symbol = symbol.to_uppercase();
  113. // 字符串替换
  114. *symbol = symbol.replace("_", "-");
  115. }
  116. self.symbol_s = b_array;
  117. }
  118. fn contains_pr(&self) -> bool {
  119. for t in self.subscribe_types.clone() {
  120. if match t {
  121. BingxSwapSubscribeType::PuFuturesTrades => false,
  122. BingxSwapSubscribeType::PuFuturesRecords => false,
  123. BingxSwapSubscribeType::PuFuturesDepth => false,
  124. } {
  125. return true;
  126. }
  127. }
  128. false
  129. }
  130. /*******************************************************************************************************/
  131. /*****************************************工具函数********************************************************/
  132. /*******************************************************************************************************/
  133. //订阅枚举解析
  134. pub fn enum_to_string(symbol: String, subscribe_type: BingxSwapSubscribeType) -> String {
  135. match subscribe_type {
  136. BingxSwapSubscribeType::PuFuturesDepth => {
  137. format!("{}@depth5@100ms", symbol)
  138. }
  139. BingxSwapSubscribeType::PuFuturesRecords => {
  140. format!("{}@kline_1m", symbol)
  141. }
  142. BingxSwapSubscribeType::PuFuturesTrades => {
  143. format!("{}@trade", symbol)
  144. }
  145. }
  146. }
  147. //订阅信息生成
  148. pub fn get_subscription(&self) -> Vec<String> {
  149. let mut array = vec![];
  150. for symbol in &self.symbol_s {
  151. for subscribe_type in &self.subscribe_types {
  152. let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
  153. let str = json!({
  154. "id": "id1",
  155. "reqType": "sub",
  156. "dataType": ty_str.to_string()
  157. });
  158. array.push(str.to_string());
  159. }
  160. }
  161. array
  162. }
  163. /*******************************************************************************************************/
  164. /*****************************************socket基本*****************************************************/
  165. /*******************************************************************************************************/
  166. //链接
  167. pub async fn ws_connect_async<F, Future>(&mut self,
  168. is_shutdown_arc: Arc<AtomicBool>,
  169. handle_function: F,
  170. _write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  171. write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
  172. where
  173. F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
  174. Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  175. {
  176. let login_is = self.contains_pr();
  177. let subscription = self.get_subscription();
  178. let address_url = self.address_url.clone();
  179. let tag = self.tag.clone();
  180. // let heartbeat_time = self.ws_param.ws_ping_interval.clone();
  181. //心跳-- 方法内部线程启动
  182. // let write_tx_clone1 = write_tx_am.clone();
  183. // tokio::spawn(async move {
  184. // trace!("线程-异步心跳-开始");
  185. // AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
  186. // trace!("线程-异步心跳-结束");
  187. // });
  188. //设置订阅
  189. let subscribe_array = subscription.clone();
  190. if login_is {
  191. //登录相关
  192. }
  193. //1 链接
  194. let t2 = tokio::spawn(async move {
  195. let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
  196. loop {
  197. info!("Bingx_usdt_swap socket 连接中……");
  198. AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
  199. false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
  200. Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
  201. error!("Bingx_usdt_swap socket 断连,1s以后重连……");
  202. tokio::time::sleep(Duration::from_secs(1)).await;
  203. }
  204. });
  205. tokio::try_join!(t2).unwrap();
  206. trace!("线程-心跳与链接-结束");
  207. Ok(())
  208. }
  209. /*******************************************************************************************************/
  210. /*****************************************数据解析*****************************************************/
  211. /*******************************************************************************************************/
  212. //数据解析-Text
  213. pub fn message_text(text: String) -> Option<ResponseData> {
  214. let response_data = Self::ok_text(text);
  215. Option::from(response_data)
  216. }
  217. //数据解析-ping
  218. pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
  219. return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
  220. }
  221. //数据解析-pong
  222. pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
  223. return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
  224. }
  225. //数据解析-二进制
  226. pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
  227. //二进制WebSocket消息
  228. // let message_str = format!("Binary:{:?}", _po);
  229. // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
  230. // let result = String::from_utf8(bytes);
  231. // let result = String::from_utf8(po);
  232. let mut gz_decoder = GzDecoder::new(&po[..]);
  233. let mut decompressed_data = Vec::new();
  234. // 尝试解压数据
  235. if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
  236. // 将解压后的字节向量转换为 UTF-8 字符串
  237. match String::from_utf8(decompressed_data) {
  238. Ok(text) => {
  239. let response_data = Self::ok_text(text);
  240. return Option::from(response_data);
  241. }
  242. Err(_) => {
  243. return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
  244. }
  245. }
  246. } else {
  247. return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
  248. }
  249. }
  250. //数据解析
  251. pub fn ok_text(text: String) -> ResponseData
  252. {
  253. // trace!("原始数据:{:?}",text);
  254. match text.as_str() {
  255. "Ping" => {
  256. return ResponseData::new("".to_string(), -300, "success".to_string(), Value::String(String::from("Pong")));
  257. }
  258. _ => {}
  259. }
  260. let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
  261. let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
  262. // { "id": "id1", "code": 0, "msg": "" }
  263. if json_value["id"].as_str() == Option::from("id1") {
  264. //订阅
  265. if json_value["code"].as_i64() == Option::from(0) {
  266. res_data.code = -201;
  267. res_data.message = "订阅成功".to_string();
  268. } else {
  269. res_data.code = 400;
  270. res_data.message = "订阅失败".to_string();
  271. }
  272. } else if json_value["code"].as_i64() == Option::from(0) {
  273. res_data.code = 200;
  274. res_data.data = json_value.clone();
  275. //订阅数据 甄别
  276. let data_type = json_value["dataType"].as_str().unwrap();
  277. if data_type.contains("@depth") {
  278. res_data.channel = "futures.order_book".to_string();
  279. } else if data_type.contains("@trade") {
  280. res_data.channel = "futures.trades".to_string();
  281. } else if data_type.contains("@kline_1m") {
  282. res_data.channel = "futures.candlesticks".to_string();
  283. } else {
  284. res_data.channel = "未知推送数据".to_string();
  285. }
  286. } else {
  287. res_data.code = -1;
  288. res_data.message = "未知解析".to_string();
  289. }
  290. res_data
  291. }
  292. }