bitget_swap_ws.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. use std::sync::Arc;
  2. use std::sync::atomic::AtomicBool;
  3. use std::time::Duration;
  4. use chrono::{Utc};
  5. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  6. use serde_json::{json, Value};
  7. use tracing::{error, info, trace};
  8. use ring::hmac;
  9. use tokio::sync::Mutex;
  10. use tokio_tungstenite::tungstenite::{Error, Message};
  11. use crate::response_base::ResponseData;
  12. use crate::socket_tool::{AbstractWsMode, HeartbeatType};
  13. pub enum BitgetSwapWsType {
  14. Public,
  15. Private,
  16. }
  17. #[derive(Clone)]
  18. pub enum BitgetSwapSubscribeType {
  19. PuTrade,
  20. PuBooks1,
  21. PrAccount,
  22. PrPosition,
  23. PrOrders,
  24. }
  25. #[derive(Clone)]
  26. #[allow(dead_code)]
  27. pub struct BitgetSwapLogin {
  28. pub api_key: String,
  29. pub secret_key: String,
  30. pub passphrase_key: String,
  31. }
  32. #[derive(Clone)]
  33. pub struct BitgetSwapWs {
  34. label: String, // 类型
  35. address_url: String, // 地址
  36. login_param: Option<BitgetSwapLogin>, // 账号
  37. symbol_s: Vec<String>, // 币对
  38. subscribe_types: Vec<BitgetSwapSubscribeType>, // 订阅
  39. heartbeat_time: u64, // 心跳间隔
  40. }
  41. impl BitgetSwapWs {
  42. pub fn new(is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
  43. return BitgetSwapWs::new_label("default-BitgetSwapWs".to_string(), is_colo, login_param, ws_type);
  44. }
  45. pub fn new_label(label: String, is_colo: bool, login_param: Option<BitgetSwapLogin>, ws_type: BitgetSwapWsType) -> BitgetSwapWs {
  46. let address_url = match ws_type {
  47. BitgetSwapWsType::Public => {
  48. "wss://ws.bitget.com/v2/ws/public".to_string()
  49. }
  50. BitgetSwapWsType::Private => {
  51. "wss://ws.bitget.com/v2/ws/private".to_string()
  52. }
  53. };
  54. if is_colo {
  55. info!("开启高速(未配置,走普通:{})通道",address_url);
  56. } else {
  57. info!("走普通通道:{}",address_url);
  58. }
  59. BitgetSwapWs {
  60. label,
  61. address_url,
  62. login_param,
  63. symbol_s: vec![],
  64. subscribe_types: vec![],
  65. heartbeat_time: 1000 * 20
  66. }
  67. }
  68. // 添加订阅信息
  69. pub fn set_subscribe(&mut self, subscribe_types: Vec<BitgetSwapSubscribeType>) {
  70. self.subscribe_types.extend(subscribe_types);
  71. }
  72. // 手动添加币对
  73. pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
  74. for symbol in b_array.iter_mut() {
  75. // 小写
  76. *symbol = symbol.to_uppercase();
  77. // 字符串替换
  78. *symbol = symbol.replace("-", "");
  79. *symbol = symbol.replace("_", "");
  80. }
  81. self.symbol_s = b_array;
  82. }
  83. //频道是否需要登录
  84. fn contains_pr(&self) -> bool {
  85. for t in self.subscribe_types.clone() {
  86. if match t {
  87. BitgetSwapSubscribeType::PuTrade => false,
  88. BitgetSwapSubscribeType::PuBooks1 => false,
  89. BitgetSwapSubscribeType::PrAccount => true,
  90. BitgetSwapSubscribeType::PrOrders => true,
  91. BitgetSwapSubscribeType::PrPosition => true
  92. } {
  93. return true;
  94. }
  95. }
  96. false
  97. }
  98. /*******************************************************************************************************/
  99. /*****************************************工具函数*******************************************************/
  100. /*******************************************************************************************************/
  101. // 枚举解析成json
  102. pub fn enum_to_json(symbol: String, subscribe_type: BitgetSwapSubscribeType) -> Value {
  103. match subscribe_type {
  104. // 公共订阅
  105. BitgetSwapSubscribeType::PuTrade => {
  106. json!({
  107. "instType": "USDT-FUTURES",
  108. "channel": "trade",
  109. "instId": symbol,
  110. })
  111. },
  112. BitgetSwapSubscribeType::PuBooks1 => {
  113. json!({
  114. "instType": "USDT-FUTURES",
  115. "channel": "books1",
  116. "instId": symbol,
  117. })
  118. },
  119. // 私有订阅
  120. BitgetSwapSubscribeType::PrAccount => {
  121. json!({
  122. "instType": "USDT-FUTURES",
  123. "channel": "account",
  124. "coin": "default",
  125. })
  126. },
  127. BitgetSwapSubscribeType::PrPosition => {
  128. json!({
  129. "instType": "USDT-FUTURES",
  130. "channel": "positions",
  131. "instId": "default"
  132. })
  133. },
  134. BitgetSwapSubscribeType::PrOrders => {
  135. json!({
  136. "instType": "USDT-FUTURES",
  137. "channel": "orders",
  138. "instId": "default"
  139. })
  140. },
  141. }
  142. }
  143. // 订阅信息生成
  144. pub fn get_subscription(&self) -> String {
  145. let mut params = vec![];
  146. for symbol in &self.symbol_s {
  147. for subscribe_type in &self.subscribe_types {
  148. let ty_str = Self::enum_to_json(symbol.clone(), subscribe_type.clone());
  149. params.push(ty_str);
  150. }
  151. }
  152. let str = json!({
  153. "op": "subscribe",
  154. "args": params
  155. });
  156. str.to_string()
  157. }
  158. // 登录数据组装
  159. fn log_in_to_str(&self) -> String {
  160. let mut login_json_str = "".to_string();
  161. let mut access_key: String = "".to_string();
  162. let mut secret_key: String = "".to_string();
  163. let mut passphrase: String = "".to_string();
  164. match self.login_param.clone() {
  165. None => {}
  166. Some(param) => {
  167. access_key = param.api_key;
  168. secret_key = param.secret_key;
  169. passphrase = param.passphrase_key;
  170. }
  171. }
  172. if access_key.len() > 0 || secret_key.len() > 0 || passphrase.len() > 0 {
  173. let timestamp = Utc::now().timestamp().to_string();
  174. // 时间戳 + 请求类型+ 请求参数字符串
  175. let message = format!("{}GET{}", timestamp, "/user/verify");
  176. let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  177. let result = hmac::sign(&hmac_key, &message.as_bytes());
  178. let sign = base64::encode(result);
  179. let login_json = json!({
  180. "op": "login",
  181. "args": [{
  182. "apiKey": access_key,
  183. "passphrase": passphrase,
  184. "timestamp": timestamp,
  185. "sign": sign
  186. }]
  187. });
  188. info!("---login_json: {0}", login_json.to_string());
  189. info!("---登陆: {}", login_json.to_string());
  190. login_json_str = login_json.to_string();
  191. }
  192. login_json_str
  193. }
  194. /*******************************************************************************************************/
  195. /*****************************************socket基本*****************************************************/
  196. /*******************************************************************************************************/
  197. pub async fn ws_connect_async<F, Future>(&mut self,
  198. is_shutdown_arc: Arc<AtomicBool>,
  199. handle_function: F,
  200. write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
  201. write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
  202. where
  203. F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
  204. Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
  205. {
  206. let login_is = self.contains_pr();
  207. let address_url = self.address_url.clone();
  208. let label = self.label.clone();
  209. let heartbeat_time = self.heartbeat_time.clone();
  210. // 登录相关
  211. if login_is {
  212. let login_str = self.log_in_to_str();
  213. info!("发起ws登录: {}", login_str);
  214. // TODO 这样写不能断线重连,后面再想办法修吧
  215. let write_tx_clone2 = Arc::clone(write_tx_am);
  216. AbstractWsMode::send_subscribe(write_tx_clone2, Message::Text(login_str)).await;
  217. }
  218. // 设置订阅
  219. let subscription = self.get_subscription();
  220. let mut subscribe_array = vec![subscription.to_string()];
  221. info!(?subscribe_array);
  222. //心跳-- 方法内部线程启动
  223. let write_tx_clone1 = Arc::clone(write_tx_am);
  224. tokio::spawn(async move {
  225. AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time).await;
  226. });
  227. //链接
  228. let t2 = tokio::spawn(async move {
  229. let write_to_socket_rx_arc = Arc::new(Mutex::new(write_to_socket_rx));
  230. loop {
  231. info!("bitget_usdt_swap socket 连接中……");
  232. // ws层重连
  233. AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
  234. login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
  235. Self::message_text, Self::message_ping, Self::message_pong).await;
  236. error!("bitget_usdt_swap socket 断连,1s以后重连……");
  237. tokio::time::sleep(Duration::from_secs(1)).await;
  238. }
  239. });
  240. tokio::try_join!(t2).unwrap();
  241. trace!("线程-心跳与链接-结束");
  242. Ok(())
  243. }
  244. /*******************************************************************************************************/
  245. /*****************************************数据解析*******************************************************/
  246. /******************************************************************************************************/
  247. // 数据解析-Text
  248. pub fn message_text(text: String) -> Option<ResponseData> {
  249. let response_data = Self::ok_text(text);
  250. Option::from(response_data)
  251. }
  252. // 数据解析-ping
  253. pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
  254. return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), Value::Null));
  255. }
  256. // 数据解析-pong
  257. pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
  258. return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), Value::Null));
  259. }
  260. //数据解析
  261. pub fn ok_text(text: String) -> ResponseData {
  262. let mut res_data = ResponseData::new("".to_string(), "200".to_string(), text.clone(), Value::Null);
  263. let json_value: Value = serde_json::from_str(&text).unwrap();
  264. if json_value.get("event").is_some() && json_value["event"].as_str() == Some("login") {
  265. if json_value.get("code").is_some() && json_value["code"] == 0 {
  266. res_data.message = "登陆成功".to_string();
  267. } else {
  268. res_data.message = format!("登陆失败:({},{})", json_value.get("code").as_ref().unwrap(), json_value.get("msg").as_ref().unwrap());
  269. }
  270. res_data.channel = "login".to_string();
  271. res_data.code = "-200".to_string();
  272. res_data.data = json_value;
  273. res_data
  274. } else if json_value.get("event").is_some() && json_value["event"].as_str() == Some("subscribe") {
  275. res_data.code = "-201".to_string();
  276. res_data.data = json_value.clone();
  277. res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
  278. res_data.message = "success".to_string();
  279. res_data
  280. } else if json_value.get("action").is_some() {
  281. res_data.data = json_value["data"].clone();
  282. if res_data.data == "[]" {
  283. res_data.code = "".to_string();
  284. } else {
  285. res_data.code = "200".to_string();
  286. }
  287. res_data.message = "success".to_string();
  288. res_data.channel = format!("{}", json_value["arg"]["channel"].as_str().unwrap());
  289. res_data.reach_time = json_value["ts"].as_i64().unwrap() * 1000;
  290. res_data
  291. } else {
  292. res_data
  293. }
  294. }
  295. }