binance_swap_ws.rs 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  4. use serde_json::json;
  5. use tokio::sync::Mutex;
  6. use tokio_tungstenite::tungstenite::{Error, Message};
  7. use tracing::{info, trace};
  8. use crate::response_base::ResponseData;
  9. use crate::socket_tool::{AbstractWsMode, HeartbeatType};
  10. use crate::utils::get_time_microsecond;
  11. //类型
  12. pub enum BinanceSwapWsType {
  13. PublicAndPrivate,
  14. }
  15. //订阅频道
  16. #[derive(Clone)]
  17. pub enum BinanceSwapSubscribeType {
  18. PuBookTicker,
  19. PuAggTrade,
  20. PuDepth20levels100ms,
  21. }
  22. //账号信息
  23. #[derive(Clone)]
  24. #[allow(dead_code)]
  25. pub struct BinanceSwapLogin {
  26. api_key: String,
  27. api_secret: String,
  28. }
  29. #[derive(Clone)]
  30. #[allow(dead_code)]
  31. pub struct BinanceSwapWs {
  32. //类型
  33. label: String,
  34. //地址
  35. address_url: String,
  36. //账号
  37. login_param: Option<BinanceSwapLogin>,
  38. //币对
  39. symbol_s: Vec<String>,
  40. //订阅
  41. subscribe_types: Vec<BinanceSwapSubscribeType>,
  42. //心跳间隔
  43. heartbeat_time: u64,
  44. }
  45. impl BinanceSwapWs {
  46. /*******************************************************************************************************/
  47. /*****************************************获取一个对象****************************************************/
  48. /*******************************************************************************************************/
  49. pub fn new(is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
  50. return BinanceSwapWs::new_label("default-BinanceSwapWs".to_string(), is_colo, login_param, ws_type);
  51. }
  52. pub fn new_label(label: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
  53. /*******公共频道-私有频道数据组装*/
  54. let address_url = match ws_type {
  55. BinanceSwapWsType::PublicAndPrivate => {
  56. // "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string(),
  57. "wss://fstream.binance.com/stream".to_string()
  58. }
  59. };
  60. if is_colo {
  61. info!("开启高速(未配置,走普通:{})通道",address_url);
  62. } else {
  63. info!("走普通通道:{}",address_url);
  64. }
  65. BinanceSwapWs {
  66. label,
  67. address_url,
  68. login_param,
  69. symbol_s: vec![],
  70. subscribe_types: vec![],
  71. heartbeat_time: 1000 * 20,
  72. }
  73. }
  74. /*******************************************************************************************************/
  75. /*****************************************订阅函数********************************************************/
  76. /*******************************************************************************************************/
  77. //手动添加订阅信息
  78. pub fn set_subscribe(&mut self, subscribe_types: Vec<BinanceSwapSubscribeType>) {
  79. self.subscribe_types.extend(subscribe_types);
  80. }
  81. //手动添加币对
  82. pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
  83. for symbol in b_array.iter_mut() {
  84. // 小写
  85. *symbol = symbol.to_lowercase();
  86. // 字符串替换
  87. *symbol = symbol.replace("_", "");
  88. *symbol = symbol.replace("-", "");
  89. }
  90. self.symbol_s = b_array;
  91. }
  92. //频道是否需要登录
  93. fn contains_pr(&self) -> bool {
  94. for t in self.subscribe_types.clone() {
  95. if match t {
  96. BinanceSwapSubscribeType::PuBookTicker => false,
  97. BinanceSwapSubscribeType::PuAggTrade => false,
  98. BinanceSwapSubscribeType::PuDepth20levels100ms => false,
  99. } {
  100. return true;
  101. }
  102. }
  103. false
  104. }
  105. /*******************************************************************************************************/
  106. /*****************************************工具函数********************************************************/
  107. /*******************************************************************************************************/
  108. //订阅枚举解析
  109. pub fn enum_to_string(symbol: String, subscribe_type: BinanceSwapSubscribeType) -> String {
  110. match subscribe_type {
  111. BinanceSwapSubscribeType::PuAggTrade => {
  112. format!("{}@aggTrade", symbol)
  113. }
  114. BinanceSwapSubscribeType::PuDepth20levels100ms => {
  115. format!("{}@depth20@100ms", symbol)
  116. }
  117. BinanceSwapSubscribeType::PuBookTicker => {
  118. format!("{}@bookTicker", symbol)
  119. }
  120. }
  121. }
  122. //订阅信息生成
  123. pub fn get_subscription(&self) -> String {
  124. let mut params = vec![];
  125. for symbol in &self.symbol_s {
  126. for subscribe_type in &self.subscribe_types {
  127. let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
  128. params.push(ty_str);
  129. }
  130. }
  131. let str = json!({
  132. "method": "SUBSCRIBE",
  133. "params": params,
  134. "id": 1
  135. });
  136. str.to_string()
  137. }
  138. /*******************************************************************************************************/
  139. /*****************************************socket基本*****************************************************/
  140. /*******************************************************************************************************/
  141. //链接
  142. pub async fn ws_connect_async(&mut self,
  143. bool_v1: Arc<AtomicBool>,
  144. write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  145. write_rx: UnboundedReceiver<Message>,
  146. read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
  147. {
  148. let login_is = self.contains_pr();
  149. let subscription = self.get_subscription();
  150. let address_url = self.address_url.clone();
  151. let label = self.label.clone();
  152. let heartbeat_time = self.heartbeat_time.clone();
  153. //心跳-- 方法内部线程启动
  154. let write_tx_clone1 = Arc::clone(write_tx_am);
  155. tokio::spawn(async move {
  156. trace!("线程-异步心跳-开始");
  157. AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Pong, heartbeat_time).await;
  158. trace!("线程-异步心跳-结束");
  159. });
  160. //设置订阅
  161. let mut subscribe_array = vec![];
  162. if login_is {
  163. //登录相关
  164. }
  165. subscribe_array.push(subscription.to_string());
  166. //链接
  167. let t2 = tokio::spawn(async move {
  168. trace!("线程-异步链接-开始");
  169. AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
  170. label.clone(), subscribe_array,
  171. write_rx, read_tx,
  172. Self::message_text,
  173. Self::message_ping,
  174. Self::message_pong,
  175. ).await.expect("币安-期货");
  176. trace!("线程-异步链接-结束");
  177. });
  178. tokio::try_join!(t2).unwrap();
  179. trace!("线程-心跳与链接-结束");
  180. Ok(())
  181. }
  182. /*******************************************************************************************************/
  183. /*****************************************数据解析*****************************************************/
  184. /*******************************************************************************************************/
  185. //数据解析-Text
  186. pub fn message_text(text: String) -> Option<ResponseData> {
  187. let mut response_data = Self::ok_text(text);
  188. response_data.time = get_time_microsecond();
  189. match response_data.code.as_str() {
  190. "200" => Option::from(response_data),
  191. "-201" => {
  192. trace!("订阅成功:{:?}", response_data);
  193. None
  194. }
  195. _ => None
  196. }
  197. }
  198. //数据解析-ping
  199. pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
  200. trace!("服务器响应-ping");
  201. return None;
  202. }
  203. //数据解析-pong
  204. pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
  205. trace!("服务器响应-pong");
  206. return None;
  207. }
  208. //数据解析
  209. pub fn ok_text(text: String) -> ResponseData {
  210. // trace!("原始数据");
  211. // trace!(?text);
  212. let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
  213. let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
  214. if json_value.get("result").is_some() && json_value.get("id").is_some() &&
  215. json_value.get("id").unwrap() == 1
  216. {
  217. res_data.code = "-201".to_string();
  218. res_data.message = "订阅成功".to_string();
  219. } else if json_value.get("error").is_some() {//订阅返回
  220. res_data.code = json_value["error"]["code"].to_string();
  221. res_data.message = json_value["error"]["msg"].to_string();
  222. } else if json_value.get("stream").is_some() {//订阅返回
  223. res_data.data = format!("{}", json_value.get("data").as_ref().unwrap());
  224. res_data.code = "200".to_string();
  225. let channel = format!("{}", json_value.get("stream").as_ref().unwrap());
  226. if channel.contains("@aggTrade") {
  227. res_data.channel = "aggTrade".to_string();
  228. } else if channel.contains("@depth20@100ms") {
  229. res_data.channel = "depth".to_string();
  230. } else if channel.contains("@bookTicker") {
  231. res_data.channel = "bookTicker".to_string();
  232. } else {
  233. res_data.channel = "未知的频道".to_string();
  234. }
  235. } else {
  236. res_data.data = text
  237. }
  238. res_data
  239. }
  240. }