bitget_swap_ws.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use chrono::{Utc};
  4. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  5. use serde_json::{json, Value};
  6. use tracing::{error, info, trace};
  7. use ring::hmac;
  8. use tokio::sync::Mutex;
  9. use tokio_tungstenite::tungstenite::{Error, Message};
  10. use crate::response_base::ResponseData;
  11. use crate::socket_tool::{AbstractWsMode, HeartbeatType};
  12. pub enum BitgetSwapWsType {
  13. Public,
  14. Private,
  15. }
  16. #[derive(Clone)]
  17. pub enum BitgetSwapSubscribeType {
  18. PuTrade,
  19. PuBooks1,
  20. PuKline(String),
  21. PrAccount,
  22. PrPosition,
  23. PrOrders,
  24. }
  25. #[derive(Clone)]
  26. #[allow(dead_code)]
  27. pub struct BitgetSwapLogin {
  28. pub api_key: String,
  29. pub secret_key: String,
  30. pub passphrase_key: String,
  31. }
  32. #[derive(Clone)]
  33. pub struct BitgetSwapWs {
  34. label: String, // 类型
  35. address_url: String, // 地址
  36. login_param: Option<BitgetSwapLogin>, // 账号
  37. symbol_s: Vec<String>, // 币对
  38. subscribe_types: Vec<BitgetSwapSubscribeType>, // 订阅
  39. heartbeat_time: u64, // 心跳间隔
  40. }
  41. impl BitgetSwapWs {
  42. pub fn new(is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
  43. return BitgetSwapWs::new_label("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type);
  44. }
  45. pub fn new_label(label: String, is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
  46. let address_url = match ws_type {
  47. BitgetSwapWsType::Public => {
  48. "wss://ws.bitget.com/v2/ws/public".to_string()
  49. }
  50. BitgetSwapWsType::Private => {
  51. "wss://ws.bitget.com/v2/ws/private".to_string()
  52. }
  53. };
  54. if is_colo {
  55. info!("开启高速(未配置,走普通:{})通道",address_url);
  56. } else {
  57. info!("走普通通道:{}",address_url);
  58. }
  59. BitgetSwapWs {
  60. label,
  61. address_url,
  62. login_param,
  63. symbol_s: vec![],
  64. subscribe_types: vec![],
  65. heartbeat_time: 1000 * 10,
  66. }
  67. }
  68. // 添加订阅信息
  69. pub fn set_subscribe(&mut self, subscribe_types: Vec<BitgetSwapSubscribeType>) {
  70. self.subscribe_types.extend(subscribe_types);
  71. }
  72. // 手动添加币对
  73. pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
  74. for symbol in b_array.iter_mut() {
  75. // 小写
  76. *symbol = symbol.to_uppercase();
  77. // 字符串替换
  78. *symbol = symbol.replace("-", "");
  79. *symbol = symbol.replace("_", "");
  80. }
  81. self.symbol_s = b_array;
  82. }
  83. //频道是否需要登录
  84. fn contains_pr(&self) -> bool {
  85. for t in self.subscribe_types.clone() {
  86. if match t {
  87. BitgetSwapSubscribeType::PuTrade => false,
  88. BitgetSwapSubscribeType::PuBooks1 => false,
  89. BitgetSwapSubscribeType::PuKline(_) => false,
  90. BitgetSwapSubscribeType::PrAccount => true,
  91. BitgetSwapSubscribeType::PrOrders => true,
  92. BitgetSwapSubscribeType::PrPosition => true
  93. } {
  94. return true;
  95. }
  96. }
  97. false
  98. }
  99. /*******************************************************************************************************/
  100. /*****************************************工具函数*******************************************************/
  101. /*******************************************************************************************************/
  102. // 枚举解析成json
  103. pub fn enum_to_json(symbol: String, subscribe_type: BitgetSwapSubscribeType) -> Value {
  104. match subscribe_type {
  105. // 公共订阅
  106. BitgetSwapSubscribeType::PuTrade => {
  107. json!({
  108. "instType": "USDT-FUTURES",
  109. "channel": "trade",
  110. "instId": symbol,
  111. })
  112. }
  113. BitgetSwapSubscribeType::PuBooks1 => {
  114. json!({
  115. "instType": "USDT-FUTURES",
  116. "channel": "books1",
  117. "instId": symbol,
  118. })
  119. }
  120. BitgetSwapSubscribeType::PuKline(t) => {
  121. json!({
  122. "instType": "USDT-FUTURES",
  123. "channel": format!("candle{}m", t),
  124. "instId": symbol,
  125. })
  126. }
  127. // 私有订阅
  128. BitgetSwapSubscribeType::PrAccount => {
  129. json!({
  130. "instType": "USDT-FUTURES",
  131. "channel": "account",
  132. "coin": "default",
  133. })
  134. }
  135. BitgetSwapSubscribeType::PrPosition => {
  136. json!({
  137. "instType": "USDT-FUTURES",
  138. "channel": "positions",
  139. "instId": "default"
  140. })
  141. }
  142. BitgetSwapSubscribeType::PrOrders => {
  143. json!({
  144. "instType": "USDT-FUTURES",
  145. "channel": "orders",
  146. "instId": "default"
  147. })
  148. }
  149. }
  150. }
  151. // 订阅信息生成
  152. pub fn get_subscription(&self) -> String {
  153. let mut params = vec![];
  154. for symbol in &self.symbol_s {
  155. for subscribe_type in &self.subscribe_types {
  156. let ty_str = Self::enum_to_json(symbol.clone(), subscribe_type.clone());
  157. params.push(ty_str);
  158. }
  159. }
  160. let str = json!({
  161. "op": "subscribe",
  162. "args": params
  163. });
  164. str.to_string()
  165. }
  166. /*******************************************************************************************************/
  167. /*****************************************socket基本*****************************************************/
  168. /*******************************************************************************************************/
  169. pub async fn ws_connect_async<F, Future>(&mut self,
  170. is_shutdown_arc: Arc<AtomicBool>,
  171. handle_function: F,
  172. write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  173. write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
  174. where
  175. F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
  176. Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  177. {
  178. let login_is = self.contains_pr();
  179. let address_url = self.address_url.clone();
  180. let label = self.label.clone();
  181. let heartbeat_time = self.heartbeat_time.clone();
  182. // 设置订阅
  183. let subscription = self.get_subscription();
  184. let subscribe_array = vec![subscription.to_string()];
  185. info!(?subscribe_array);
  186. //心跳-- 方法内部线程启动
  187. let write_tx_clone1 = Arc::clone(write_tx_am);
  188. tokio::spawn(async move {
  189. let ping_str = json!("ping");
  190. AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.as_str().unwrap().to_string()), heartbeat_time).await;
  191. });
  192. //链接
  193. let login_param = self.login_param.clone();
  194. let write_tx_clone2 = Arc::clone(write_tx_am);
  195. let t2 = tokio::spawn(async move {
  196. let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
  197. loop {
  198. info!("bitget_usdt_swap socket 连接中……");
  199. // 登录相关
  200. if login_is {
  201. let login_param_c = login_param.clone().unwrap();
  202. let timestamp = Utc::now().timestamp().to_string();
  203. // 时间戳 + 请求类型+ 请求参数字符串
  204. let message = format!("{}GET{}", timestamp, "/user/verify");
  205. let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, login_param_c.secret_key.as_bytes());
  206. let result = hmac::sign(&hmac_key, &message.as_bytes());
  207. let sign = base64::encode(result);
  208. let login_json = json!({
  209. "op": "login",
  210. "args": [{
  211. "apiKey": login_param_c.api_key,
  212. "passphrase": login_param_c.passphrase_key,
  213. "timestamp": timestamp,
  214. "sign": sign
  215. }]
  216. });
  217. let login_str = login_json.to_string();
  218. info!("发起ws登录: {}", login_str);
  219. let write_tx_c = Arc::clone(&write_tx_clone2);
  220. AbstractWsMode::send_subscribe(write_tx_c, Message::Text(login_str)).await;
  221. }
  222. // ws层重连
  223. AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
  224. login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
  225. Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
  226. error!("bitget_usdt_swap socket 断连,重连中……");
  227. }
  228. });
  229. tokio::try_join!(t2).unwrap();
  230. trace!("线程-心跳与链接-结束");
  231. Ok(())
  232. }
  233. /*******************************************************************************************************/
  234. /*****************************************数据解析*******************************************************/
  235. /******************************************************************************************************/
  236. // 数据解析-Text
  237. pub fn message_text(text: String) -> Option<ResponseData> {
  238. let response_data = Self::ok_text(text);
  239. Option::from(response_data)
  240. }
  241. // 数据解析-ping
  242. pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
  243. return Option::from(ResponseData::new("".to_string(), -300, "success".to_string(), Value::Null));
  244. }
  245. // 数据解析-pong
  246. pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
  247. return Option::from(ResponseData::new("".to_string(), -301, "success".to_string(), Value::Null));
  248. }
  249. //数据解析-二进制
  250. pub fn message_binary(_po: Vec<u8>) -> Option<ResponseData> {
  251. //二进制WebSocket消息
  252. let message_str = format!("Binary:{:?}", _po);
  253. Option::from(ResponseData::new("".to_string(), 2, message_str, Value::Null))
  254. }
  255. //数据解析
  256. pub fn ok_text(text: String) -> ResponseData {
  257. let mut res_data = ResponseData::new("".to_string(), 200, text.clone(), Value::Null);
  258. match text.as_str() {
  259. "pong" => {
  260. res_data.code = -301;
  261. res_data.channel = "pong".to_string();
  262. res_data.message = "success".to_string();
  263. }
  264. _ => {
  265. let json_value: Value = serde_json::from_str(&text).unwrap();
  266. if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") {
  267. if json_value.get("code").is_some() && json_value["code"] == 0 {
  268. res_data.message = "登陆成功".to_string();
  269. } else {
  270. res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
  271. }
  272. res_data.channel = "login".to_string();
  273. res_data.code = -200;
  274. res_data.data = json_value;
  275. } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
  276. res_data.code = -201;
  277. res_data.data = json_value.clone();
  278. res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
  279. res_data.message = "success".to_string();
  280. } else if json_value.get("action").is_some() {
  281. let channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
  282. res_data.data = json_value.clone();
  283. res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
  284. res_data.message = "success".to_string();
  285. res_data.code = 200;
  286. if channel.contains("books1") {
  287. res_data.channel = "orderbook".to_string();
  288. res_data.data_type = json_value["action"].as_str().unwrap().to_string();
  289. // bybit 时间在data块外
  290. res_data.reach_time = json_value.get("ts").unwrap().as_i64().unwrap_or(0i64);
  291. } else if channel.contains("trade") {
  292. res_data.channel = "trade".to_string();
  293. res_data.data_type = json_value["action"].as_str().unwrap().to_string();
  294. } else if channel.contains("ticker") {
  295. res_data.channel = "tickers".to_string();
  296. res_data.data["ts"] = json_value["ts"].clone();
  297. res_data.data_type = json_value["action"].as_str().unwrap().to_string();
  298. } else if channel.contains("candle") {
  299. res_data.channel = "kline".to_string();
  300. res_data.data_type = json_value["action"].as_str().unwrap().to_string();
  301. } else if channel.contains("positions") {
  302. res_data.channel = "position".to_string();
  303. res_data.data_type = json_value["action"].as_str().unwrap().to_string();
  304. } else if channel.contains("orders") {
  305. res_data.channel = "order".to_string();
  306. res_data.data_type = json_value["action"].as_str().unwrap().to_string();
  307. } else if channel.contains("account") {
  308. res_data.channel = "wallet".to_string();
  309. res_data.data_type = json_value["action"].as_str().unwrap().to_string();
  310. } else {
  311. res_data.code = -1;
  312. res_data.channel = "未知的频道".to_string();
  313. }
  314. }
  315. }
  316. }
  317. res_data
  318. }
  319. }