bitget_swap_ws.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use chrono::{Utc};
  4. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  5. use serde_json::{json, Value};
  6. use tracing::{error, info, trace};
  7. use ring::hmac;
  8. use tokio::sync::Mutex;
  9. use tokio_tungstenite::tungstenite::{Error, Message};
  10. use crate::response_base::ResponseData;
  11. use crate::socket_tool::{AbstractWsMode, HeartbeatType};
  12. pub enum BitgetSwapWsType {
  13. Public,
  14. Private,
  15. }
  16. #[derive(Clone)]
  17. pub enum BitgetSwapSubscribeType {
  18. PuTrade,
  19. PuBooks1,
  20. PrAccount,
  21. PrPosition,
  22. PrOrders,
  23. }
  24. #[derive(Clone)]
  25. #[allow(dead_code)]
  26. pub struct BitgetSwapLogin {
  27. pub api_key: String,
  28. pub secret_key: String,
  29. pub passphrase_key: String,
  30. }
  31. #[derive(Clone)]
  32. pub struct BitgetSwapWs {
  33. label: String, // 类型
  34. address_url: String, // 地址
  35. login_param: Option<BitgetSwapLogin>, // 账号
  36. symbol_s: Vec<String>, // 币对
  37. subscribe_types: Vec<BitgetSwapSubscribeType>, // 订阅
  38. heartbeat_time: u64, // 心跳间隔
  39. }
  40. impl BitgetSwapWs {
  41. pub fn new(is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
  42. return BitgetSwapWs::new_label("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type);
  43. }
  44. pub fn new_label(label: String, is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
  45. let address_url = match ws_type {
  46. BitgetSwapWsType::Public => {
  47. "wss://ws.bitget.com/v2/ws/public".to_string()
  48. }
  49. BitgetSwapWsType::Private => {
  50. "wss://ws.bitget.com/v2/ws/private".to_string()
  51. }
  52. };
  53. if is_colo {
  54. info!("开启高速(未配置,走普通:{})通道",address_url);
  55. } else {
  56. info!("走普通通道:{}",address_url);
  57. }
  58. BitgetSwapWs {
  59. label,
  60. address_url,
  61. login_param,
  62. symbol_s: vec![],
  63. subscribe_types: vec![],
  64. heartbeat_time: 1000 * 10
  65. }
  66. }
  67. // 添加订阅信息
  68. pub fn set_subscribe(&mut self, subscribe_types: Vec<BitgetSwapSubscribeType>) {
  69. self.subscribe_types.extend(subscribe_types);
  70. }
  71. // 手动添加币对
  72. pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
  73. for symbol in b_array.iter_mut() {
  74. // 小写
  75. *symbol = symbol.to_uppercase();
  76. // 字符串替换
  77. *symbol = symbol.replace("-", "");
  78. *symbol = symbol.replace("_", "");
  79. }
  80. self.symbol_s = b_array;
  81. }
  82. //频道是否需要登录
  83. fn contains_pr(&self) -> bool {
  84. for t in self.subscribe_types.clone() {
  85. if match t {
  86. BitgetSwapSubscribeType::PuTrade => false,
  87. BitgetSwapSubscribeType::PuBooks1 => false,
  88. BitgetSwapSubscribeType::PrAccount => true,
  89. BitgetSwapSubscribeType::PrOrders => true,
  90. BitgetSwapSubscribeType::PrPosition => true
  91. } {
  92. return true;
  93. }
  94. }
  95. false
  96. }
  97. /*******************************************************************************************************/
  98. /*****************************************工具函数*******************************************************/
  99. /*******************************************************************************************************/
  100. // 枚举解析成json
  101. pub fn enum_to_json(symbol: String, subscribe_type: BitgetSwapSubscribeType) -> Value {
  102. match subscribe_type {
  103. // 公共订阅
  104. BitgetSwapSubscribeType::PuTrade => {
  105. json!({
  106. "instType": "USDT-FUTURES",
  107. "channel": "trade",
  108. "instId": symbol,
  109. })
  110. },
  111. BitgetSwapSubscribeType::PuBooks1 => {
  112. json!({
  113. "instType": "USDT-FUTURES",
  114. "channel": "books1",
  115. "instId": symbol,
  116. })
  117. },
  118. // 私有订阅
  119. BitgetSwapSubscribeType::PrAccount => {
  120. json!({
  121. "instType": "USDT-FUTURES",
  122. "channel": "account",
  123. "coin": "default",
  124. })
  125. },
  126. BitgetSwapSubscribeType::PrPosition => {
  127. json!({
  128. "instType": "USDT-FUTURES",
  129. "channel": "positions",
  130. "instId": "default"
  131. })
  132. },
  133. BitgetSwapSubscribeType::PrOrders => {
  134. json!({
  135. "instType": "USDT-FUTURES",
  136. "channel": "orders",
  137. "instId": "default"
  138. })
  139. },
  140. }
  141. }
  142. // 订阅信息生成
  143. pub fn get_subscription(&self) -> String {
  144. let mut params = vec![];
  145. for symbol in &self.symbol_s {
  146. for subscribe_type in &self.subscribe_types {
  147. let ty_str = Self::enum_to_json(symbol.clone(), subscribe_type.clone());
  148. params.push(ty_str);
  149. }
  150. }
  151. let str = json!({
  152. "op": "subscribe",
  153. "args": params
  154. });
  155. str.to_string()
  156. }
  157. /*******************************************************************************************************/
  158. /*****************************************socket基本*****************************************************/
  159. /*******************************************************************************************************/
  160. pub async fn ws_connect_async<F, Future>(&mut self,
  161. is_shutdown_arc: Arc<AtomicBool>,
  162. handle_function: F,
  163. write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  164. write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
  165. where
  166. F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
  167. Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  168. {
  169. let login_is = self.contains_pr();
  170. let address_url = self.address_url.clone();
  171. let label = self.label.clone();
  172. let heartbeat_time = self.heartbeat_time.clone();
  173. // 设置订阅
  174. let subscription = self.get_subscription();
  175. let subscribe_array = vec![subscription.to_string()];
  176. info!(?subscribe_array);
  177. //心跳-- 方法内部线程启动
  178. let write_tx_clone1 = Arc::clone(write_tx_am);
  179. tokio::spawn(async move {
  180. let ping_str = json!("ping");
  181. AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.as_str().unwrap().to_string()), heartbeat_time).await;
  182. });
  183. //链接
  184. let login_param = self.login_param.clone();
  185. let write_tx_clone2 = Arc::clone(write_tx_am);
  186. let t2 = tokio::spawn(async move {
  187. let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
  188. loop {
  189. info!("bitget_usdt_swap socket 连接中……");
  190. // 登录相关
  191. if login_is {
  192. let login_param_c = login_param.clone().unwrap();
  193. let timestamp = Utc::now().timestamp().to_string();
  194. // 时间戳 + 请求类型+ 请求参数字符串
  195. let message = format!("{}GET{}", timestamp, "/user/verify");
  196. let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, login_param_c.secret_key.as_bytes());
  197. let result = hmac::sign(&hmac_key, &message.as_bytes());
  198. let sign = base64::encode(result);
  199. let login_json = json!({
  200. "op": "login",
  201. "args": [{
  202. "apiKey": login_param_c.api_key,
  203. "passphrase": login_param_c.passphrase_key,
  204. "timestamp": timestamp,
  205. "sign": sign
  206. }]
  207. });
  208. let login_str = login_json.to_string();
  209. info!("发起ws登录: {}", login_str);
  210. let write_tx_c = Arc::clone(&write_tx_clone2);
  211. AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
  212. }
  213. // ws层重连
  214. AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
  215. login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
  216. Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
  217. error!("bitget_usdt_swap socket 断连,重连中……");
  218. }
  219. });
  220. tokio::try_join!(t2).unwrap();
  221. trace!("线程-心跳与链接-结束");
  222. Ok(())
  223. }
  224. /*******************************************************************************************************/
  225. /*****************************************数据解析*******************************************************/
  226. /******************************************************************************************************/
  227. // 数据解析-Text
  228. pub fn message_text(text: String) -> Option<ResponseData> {
  229. let response_data = Self::ok_text(text);
  230. Option::from(response_data)
  231. }
  232. // 数据解析-ping
  233. pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
  234. return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
  235. }
  236. // 数据解析-pong
  237. pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
  238. return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
  239. }
  240. //数据解析-二进制
  241. pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
  242. //二进制WebSocket消息
  243. let message_str = format!("Binary:{:?}", _po);
  244. Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
  245. }
  246. //数据解析
  247. pub fn ok_text(text: String) -> ResponseData {
  248. let mut res_data = ResponseData::new("".to_string(), 200, text.clone(), Value::Null);
  249. match text.as_str() {
  250. "pong" => {
  251. res_data.code = -301;
  252. res_data.channel = "pong".to_string();
  253. res_data.message = "success".to_string();
  254. },
  255. _ => {
  256. let json_value: Value = serde_json::from_str(&text).unwrap();
  257. if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") {
  258. if json_value.get("code").is_some() && json_value["code"] == 0 {
  259. res_data.message = "登陆成功".to_string();
  260. } else {
  261. res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
  262. }
  263. res_data.channel = "login".to_string();
  264. res_data.code = -200;
  265. res_data.data = json_value;
  266. } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
  267. res_data.code = -201;
  268. res_data.data = json_value.clone();
  269. res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
  270. res_data.message = "success".to_string();
  271. } else if json_value.get("action").is_some() {
  272. res_data.data = json_value["data"].clone();
  273. if res_data.data == "[]" {
  274. res_data.code = -1;
  275. } else {
  276. res_data.code = 200;
  277. }
  278. res_data.message = "success".to_string();
  279. res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
  280. res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
  281. }
  282. }
  283. }
  284. res_data
  285. }
  286. }