bingx_swap_ws.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. use std::collections::BTreeMap;
  2. use std::io::Read;
  3. use std::str::FromStr;
  4. use std::sync::Arc;
  5. use std::sync::atomic::AtomicBool;
  6. use std::time::Duration;
  7. use flate2::read::GzDecoder;
  8. use serde_json::json;
  9. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  10. use serde_json::Value;
  11. use tokio::sync::Mutex;
  12. use tokio_tungstenite::tungstenite::{Error, Message};
  13. use tracing::{error, info, trace};
  14. use crate::response_base::ResponseData;
  15. use crate::socket_tool::{AbstractWsMode, HeartbeatType};
  16. //类型
  17. pub enum BingxSwapWsType {
  18. PublicAndPrivate,
  19. }
  20. #[derive(Debug)]
  21. #[derive(Clone)]
  22. pub struct BingxSwapWsParam {
  23. pub token: String,
  24. pub ws_url: String,
  25. pub ws_ping_interval: i64,
  26. pub ws_ping_timeout: i64,
  27. pub is_ok_subscribe: bool,
  28. }
  29. //订阅频道
  30. #[derive(Clone)]
  31. pub enum BingxSwapSubscribeType {
  32. // 深度
  33. PuFuturesDepth,
  34. // 公开成交
  35. PuFuturesTrades,
  36. // K线数据
  37. PuFuturesRecords,
  38. }
  39. //账号信息
  40. #[derive(Clone, Debug)]
  41. pub struct BingxSwapLogin {
  42. pub access_key: String,
  43. pub secret_key: String,
  44. pub pass_key: String,
  45. }
  46. #[derive(Clone)]
  47. #[allow(dead_code)]
  48. pub struct BingxSwapWs {
  49. //类型
  50. tag: String,
  51. //地址
  52. address_url: String,
  53. //账号
  54. login_param: Option<BingxSwapLogin>,
  55. //登录数据
  56. ws_param: BingxSwapWsParam,
  57. //币对
  58. symbol_s: Vec<String>,
  59. //订阅
  60. subscribe_types: Vec<BingxSwapSubscribeType>,
  61. //心跳间隔
  62. heartbeat_time: u64,
  63. }
  64. impl BingxSwapWs {
  65. /*******************************************************************************************************/
  66. /*****************************************获取一个对象****************************************************/
  67. /*******************************************************************************************************/
  68. pub fn new(is_colo: bool, login_param: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
  69. return Self::new_with_tag("default-BingxSwapWs".to_string(), is_colo, login_param, ws_type);
  70. }
  71. pub fn new_with_tag(tag: String, is_colo: bool, login_param: Option<BingxSwapLogin>, ws_type: BingxSwapWsType) -> BingxSwapWs {
  72. /*******公共频道-私有频道数据组装*/
  73. let address_url = match ws_type {
  74. BingxSwapWsType::PublicAndPrivate => {
  75. let url = "wss://open-api-swap.bingx.com/swap-market".to_string();
  76. info!("走普通通道(不支持colo通道):{}", url);
  77. url
  78. }
  79. };
  80. /*******公共频道-私有频道数据组装*/
  81. let mut ws_param = BingxSwapWsParam {
  82. token: "".to_string(),
  83. ws_url: "".to_string(),
  84. ws_ping_interval: 0,
  85. ws_ping_timeout: 0,
  86. is_ok_subscribe: false,
  87. };
  88. if is_colo {
  89. info!("开启高速(未配置,走普通:{})通道",address_url);
  90. } else {
  91. info!("走普通通道:{}",address_url);
  92. }
  93. BingxSwapWs {
  94. tag,
  95. address_url,
  96. login_param,
  97. ws_param,
  98. symbol_s: vec![],
  99. subscribe_types: vec![],
  100. heartbeat_time: 1000 * 18,
  101. }
  102. }
  103. /*******************************************************************************************************/
  104. /*****************************************订阅函数********************************************************/
  105. /*******************************************************************************************************/
  106. //手动添加订阅信息
  107. pub fn set_subscribe(&mut self, subscribe_types: Vec<BingxSwapSubscribeType>) {
  108. self.subscribe_types.extend(subscribe_types);
  109. }
  110. //手动添加币对
  111. pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
  112. for symbol in b_array.iter_mut() {
  113. // 大写
  114. *symbol = symbol.to_uppercase();
  115. // 字符串替换
  116. *symbol = symbol.replace("_", "-");
  117. }
  118. self.symbol_s = b_array;
  119. }
  120. fn contains_pr(&self) -> bool {
  121. for t in self.subscribe_types.clone() {
  122. if match t {
  123. BingxSwapSubscribeType::PuFuturesTrades => false,
  124. BingxSwapSubscribeType::PuFuturesRecords => false,
  125. BingxSwapSubscribeType::PuFuturesDepth => false,
  126. } {
  127. return true;
  128. }
  129. }
  130. false
  131. }
  132. /*******************************************************************************************************/
  133. /*****************************************工具函数********************************************************/
  134. /*******************************************************************************************************/
  135. //订阅枚举解析
  136. pub fn enum_to_string(symbol: String, subscribe_type: BingxSwapSubscribeType) -> String {
  137. match subscribe_type {
  138. BingxSwapSubscribeType::PuFuturesDepth => {
  139. format!("{}@depth5@100ms", symbol)
  140. }
  141. BingxSwapSubscribeType::PuFuturesRecords => {
  142. format!("{}@trade", symbol)
  143. }
  144. BingxSwapSubscribeType::PuFuturesTrades => {
  145. format!("{}@kline_1m", symbol)
  146. }
  147. }
  148. }
  149. //订阅信息生成
  150. pub fn get_subscription(&self) -> Vec<String> {
  151. let mut array = vec![];
  152. for symbol in &self.symbol_s {
  153. for subscribe_type in &self.subscribe_types {
  154. let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
  155. let str = json!({
  156. "id": "id1",
  157. "reqType": "sub",
  158. "dataType": ty_str.to_string()
  159. });
  160. ;
  161. array.push(str.to_string());
  162. }
  163. }
  164. array
  165. }
  166. /*******************************************************************************************************/
  167. /*****************************************socket基本*****************************************************/
  168. /*******************************************************************************************************/
  169. //链接
  170. pub async fn ws_connect_async<F, Future>(&mut self,
  171. is_shutdown_arc: Arc<AtomicBool>,
  172. handle_function: F,
  173. write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  174. write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
  175. where
  176. F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
  177. Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  178. {
  179. let login_is = self.contains_pr();
  180. let subscription = self.get_subscription();
  181. let address_url = self.address_url.clone();
  182. let tag = self.tag.clone();
  183. let heartbeat_time = self.ws_param.ws_ping_interval.clone();
  184. //心跳-- 方法内部线程启动
  185. let write_tx_clone1 = write_tx_am.clone();
  186. // tokio::spawn(async move {
  187. // trace!("线程-异步心跳-开始");
  188. // AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
  189. // trace!("线程-异步心跳-结束");
  190. // });
  191. //设置订阅
  192. let subscribe_array = subscription.clone();
  193. if login_is {
  194. //登录相关
  195. }
  196. //1 链接
  197. let t2 = tokio::spawn(async move {
  198. let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
  199. loop {
  200. info!("Bingx_usdt_swap socket 连接中……");
  201. AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
  202. false, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
  203. Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
  204. error!("Bingx_usdt_swap socket 断连,1s以后重连……");
  205. tokio::time::sleep(Duration::from_secs(1)).await;
  206. }
  207. });
  208. tokio::try_join!(t2).unwrap();
  209. trace!("线程-心跳与链接-结束");
  210. Ok(())
  211. }
  212. /*******************************************************************************************************/
  213. /*****************************************数据解析*****************************************************/
  214. /*******************************************************************************************************/
  215. //数据解析-Text
  216. pub fn message_text(text: String) -> Option<ResponseData> {
  217. let response_data = Self::ok_text(text);
  218. Option::from(response_data)
  219. }
  220. //数据解析-ping
  221. pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
  222. return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
  223. }
  224. //数据解析-pong
  225. pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
  226. return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
  227. }
  228. //数据解析-二进制
  229. pub fn message_binary(po: Vec<u8>) -> Option<ResponseData> {
  230. //二进制WebSocket消息
  231. // let message_str = format!("Binary:{:?}", _po);
  232. // Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
  233. // let result = String::from_utf8(bytes);
  234. // let result = String::from_utf8(po);
  235. let mut gz_decoder = GzDecoder::new(&po[..]);
  236. let mut decompressed_data = Vec::new();
  237. // 尝试解压数据
  238. if let Ok(_) = gz_decoder.read_to_end(&mut decompressed_data) {
  239. // 将解压后的字节向量转换为 UTF-8 字符串
  240. match String::from_utf8(decompressed_data) {
  241. Ok(text) => {
  242. let response_data = Self::ok_text(text);
  243. return Option::from(response_data);
  244. }
  245. Err(e) => {
  246. return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
  247. }
  248. }
  249. } else {
  250. return Option::from(ResponseData::new("".to_string(), 400, "二进制数据转化出错".to_string(), Value::Null));
  251. }
  252. }
  253. //数据解析
  254. pub fn ok_text(text: String) -> ResponseData
  255. {
  256. // trace!("原始数据:{:?}",text);
  257. let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
  258. let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
  259. // { "id": "id1", "code": 0, "msg": "" }
  260. if json_value["id"].as_str() == Option::from("id1") {
  261. //订阅
  262. if json_value["code"].as_i64() == Option::from(0) {
  263. res_data.code = -201;
  264. res_data.message = "订阅成功".to_string();
  265. }else{
  266. res_data.code = 400;
  267. res_data.message = "订阅失败".to_string();
  268. }
  269. }else if json_value["code"].as_i64() == Option::from(0){
  270. res_data.code = 200;
  271. res_data.data = json_value.clone();
  272. }else{
  273. res_data.code = -1;
  274. res_data.message = "未知解析".to_string();
  275. }
  276. res_data
  277. }
  278. }