bybit_swap_test.rs 11 KB


  1. use std::collections::BTreeMap;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use futures_util::StreamExt;
  5. use tokio::sync::Mutex;
  6. use tracing::trace;
  7. use exchanges::bybit_swap_rest::BybitSwapRest;
  8. use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
  9. use exchanges::response_base::ResponseData;
  10. const ACCESS_KEY: &str = "";
  11. const SECRET_KEY: &str = "";
  12. //ws-订阅公共频道信息
  13. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  14. async fn ws_custom_subscribe_pu() {
  15. global::log_utils::init_log_with_trace();
  16. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  17. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  18. let mut ws = get_ws(None, BybitSwapWsType::Public);
  19. ws.set_symbols(vec!["BTC_USDT".to_string()]);
  20. ws.set_subscribe(vec![
  21. // BybitSwapSubscribeType::PuOrderBook1,
  22. // BybitSwapSubscribeType::PuOrderBook50,
  23. // BybitSwapSubscribeType::PuBlicTrade,
  24. BybitSwapSubscribeType::PuTickers,
  25. ]);
  26. let write_tx_am = Arc::new(Mutex::new(write_tx));
  27. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  28. //读取
  29. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  30. let _tr = tokio::spawn(async move {
  31. trace!("线程-数据读取-开启");
  32. loop {
  33. if let Some(data) = read_rx.next().await {
  34. trace!("读取数据data:{:?}",data)
  35. }
  36. }
  37. // trace!("线程-数据读取-结束");
  38. });
  39. //写数据
  40. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  41. // let write_tx_clone = Arc::clone(&write_tx_am);
  42. // let su = ws.get_subscription();
  43. // let tw = tokio::spawn(async move {
  44. // trace!("线程-数据写入-开始");
  45. // loop {
  46. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  47. // // let close_frame = CloseFrame {
  48. // // code: CloseCode::Normal,
  49. // // reason: Cow::Borrowed("Bye bye"),
  50. // // };
  51. // // let message = Message::Close(Some(close_frame));
  52. //
  53. //
  54. // let message = Message::Text(su.clone());
  55. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  56. // trace!("发送指令成功");
  57. // }
  58. // trace!("线程-数据写入-结束");
  59. // });
  60. let fun = move |data: ResponseData| {
  61. // 在 async 块之前克隆 Arc
  62. // let core_arc_cc = core_arc_clone.clone();
  63. // let mul = multiplier.clone();
  64. //
  65. // let depth_asks = Arc::clone(&depth_asks);
  66. // let depth_bids = Arc::clone(&depth_bids);
  67. async move {
  68. trace!("333333333333333:ResponseData:{:?}",data);
  69. // let mut depth_asks = depth_asks.lock().await;
  70. // let mut depth_bids = depth_bids.lock().await;
  71. // 使用克隆后的 Arc,避免 move 语义
  72. // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
  73. }
  74. };
  75. let t1 = tokio::spawn(async move {
  76. //链接
  77. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  78. ws.ws_connect_async(bool_v3_clone,fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  79. trace!("test 唯一线程结束--");
  80. });
  81. tokio::try_join!(t1).unwrap();
  82. trace!("当此结束");
  83. trace!("重启!");
  84. trace!("参考交易所关闭");
  85. return;
  86. }
  87. //ws-订阅私有频道信息
  88. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  89. async fn ws_custom_subscribe_pr() {
  90. global::log_utils::init_log_with_trace();
  91. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  92. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  93. let logparam = BybitSwapLogin {
  94. api_key: ACCESS_KEY.to_string(),
  95. secret_key: SECRET_KEY.to_string(),
  96. };
  97. let mut ws = get_ws(Option::from(logparam), BybitSwapWsType::Private);
  98. ws.set_symbols(vec!["BTC_USDT".to_string()]);
  99. ws.set_subscribe(vec![
  100. BybitSwapSubscribeType::PrPosition,
  101. // BybitSwapSubscribeType::PrExecution,
  102. // BybitSwapSubscribeType::PrOrder,
  103. // BybitSwapSubscribeType::PrWallet,
  104. ]);
  105. let write_tx_am = Arc::new(Mutex::new(write_tx));
  106. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  107. //读取
  108. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  109. let _tr = tokio::spawn(async move {
  110. trace!("线程-数据读取-开启");
  111. loop {
  112. if let Some(data) = read_rx.next().await {
  113. trace!("读取数据data:{:?}",data)
  114. }
  115. }
  116. // trace!("线程-数据读取-结束");
  117. });
  118. //写数据
  119. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  120. // let write_tx_clone = Arc::clone(&write_tx_am);
  121. // let su = ws.get_subscription();
  122. // let tw = tokio::spawn(async move {
  123. // trace!("线程-数据写入-开始");
  124. // loop {
  125. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  126. // // let close_frame = CloseFrame {
  127. // // code: CloseCode::Normal,
  128. // // reason: Cow::Borrowed("Bye bye"),
  129. // // };
  130. // // let message = Message::Close(Some(close_frame));
  131. //
  132. //
  133. // let message = Message::Text(su.clone());
  134. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  135. // trace!("发送指令成功");
  136. // }
  137. // trace!("线程-数据写入-结束");
  138. // });
  139. let fun = move |data: ResponseData| {
  140. // 在 async 块之前克隆 Arc
  141. // let core_arc_cc = core_arc_clone.clone();
  142. // let mul = multiplier.clone();
  143. //
  144. // let depth_asks = Arc::clone(&depth_asks);
  145. // let depth_bids = Arc::clone(&depth_bids);
  146. async move {
  147. trace!("333333333333333:ResponseData:{:?}",data);
  148. // let mut depth_asks = depth_asks.lock().await;
  149. // let mut depth_bids = depth_bids.lock().await;
  150. // 使用克隆后的 Arc,避免 move 语义
  151. // on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
  152. }
  153. };
  154. let t1 = tokio::spawn(async move {
  155. //链接
  156. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  157. // ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  158. ws.ws_connect_async(bool_v3_clone,fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  159. trace!("test 唯一线程结束--");
  160. });
  161. tokio::try_join!(t1).unwrap();
  162. trace!("当此结束");
  163. trace!("重启!");
  164. trace!("参考交易所关闭");
  165. return;
  166. }
  167. //rest-服務器時間
  168. #[tokio::test]
  169. async fn rest_get_server_time_test() {
  170. global::log_utils::init_log_with_trace();
  171. let mut ret = get_rest();
  172. let req_data = ret.get_server_time().await;
  173. println!("Bybit--服務器時間--{:?}", req_data);
  174. }
  175. //rest-查詢最新行情信息
  176. #[tokio::test]
  177. async fn rest_get_tickers_test() {
  178. global::log_utils::init_log_with_trace();
  179. let mut ret = get_rest();
  180. let req_data = ret.get_tickers("DOGEUSDT".to_string()).await;
  181. println!("Bybit--查詢最新行情信息--{:?}", req_data);
  182. }
  183. //rest-查詢市場價格K線數據
  184. #[tokio::test]
  185. async fn rest_get_kline_test() {
  186. global::log_utils::init_log_with_trace();
  187. let mut ret = get_rest();
  188. let req_data = ret.get_record("DOGEUSDT".to_string(), "1".to_string(), "5".to_string()).await;
  189. println!("Bybit--查詢市場價格K線數據--{:?}", req_data);
  190. }
  191. //rest-查詢公告
  192. #[tokio::test]
  193. async fn rest_get_announcements_test() {
  194. global::log_utils::init_log_with_trace();
  195. let mut ret = get_rest();
  196. let req_data = ret.get_announcements().await;
  197. println!("Bybit--查詢公告--{:?}", req_data);
  198. }
  199. //rest-查詢可交易產品的規格信息
  200. #[tokio::test]
  201. async fn rest_get_instruments_info_test() {
  202. global::log_utils::init_log_with_trace();
  203. let mut ret = get_rest();
  204. let req_data = ret.get_instruments_info("BTCUSDT".to_string()).await;
  205. println!("Bybit--查詢可交易產品的規格信息--{:?}", req_data);
  206. }
  207. //rest-查詢錢包餘額
  208. #[tokio::test]
  209. async fn rest_get_account_balance_test() {
  210. global::log_utils::init_log_with_trace();
  211. let mut ret = get_rest();
  212. let req_data = ret.get_account_balance("USDT".to_string()).await;
  213. println!("Bybit--查詢錢包餘額--{:?}", req_data);
  214. }
  215. //rest-查看持仓信息
  216. #[tokio::test]
  217. async fn rest_get_positions_test() {
  218. global::log_utils::init_log_with_trace();
  219. let mut ret = get_rest();
  220. let req_data = ret.get_positions("DOGEUSDT".to_string(), "".to_string()).await;
  221. println!("Bybit--查看持仓信息--{:?}", req_data);
  222. }
  223. //rest-设置持仓模式
  224. #[tokio::test]
  225. async fn rest_set_position_mode_test() {
  226. global::log_utils::init_log_with_trace();
  227. let mut ret = get_rest();
  228. let req_data = ret.set_position_mode("DOGEUSDT".to_string(), 3).await;
  229. println!("Bybit--设置持仓模式--{:?}", req_data);
  230. }
  231. //rest-設置槓桿
  232. #[tokio::test]
  233. async fn rest_set_leverage_test() {
  234. global::log_utils::init_log_with_trace();
  235. let mut ret = get_rest();
  236. let req_data = ret.set_leverage(
  237. "DOGEUSDT".to_string(), "1".to_string()).await;
  238. println!("Bybit--設置槓桿--{:?}", req_data);
  239. }
  240. //rest-創建委託單
  241. #[tokio::test]
  242. async fn rest_swap_order_test() {
  243. global::log_utils::init_log_with_trace();
  244. let mut ret = get_rest();
  245. let params = serde_json::json!({
  246. "category":"linear",
  247. "symbol":"DOGEUSDT",
  248. "orderType":"Limit",
  249. "side":"Buy",
  250. "qty":"1",
  251. "price":"0.085",
  252. });
  253. let req_data = ret.swap_order(params).await;
  254. println!("Bybit--創建委託單--{:?}", req_data);
  255. }
  256. //rest-查詢實時委託單
  257. #[tokio::test]
  258. async fn rest_get_order_test() {
  259. global::log_utils::init_log_with_trace();
  260. let mut ret = get_rest();
  261. let req_data = ret.get_order("LINKUSDT".to_string(),
  262. "".to_string(), "".to_string()).await;
  263. println!("Bybit--查詢實時委託單--{:?}", req_data);
  264. }
  265. //rest-撤单
  266. #[tokio::test]
  267. async fn rest_cancel_order_test() {
  268. global::log_utils::init_log_with_trace();
  269. let mut ret = get_rest();
  270. let req_data = ret.cancel_order("DOGEUSDT".to_string(),
  271. "1d3ea16f-cf1c-4dab-9a79-d441a2dea549".to_string(), "".to_string()).await;
  272. println!("Bybit--撤单--{:?}", req_data);
  273. }
  274. //rest-撤銷所有訂單
  275. #[tokio::test]
  276. async fn rest_cancel_orders_test() {
  277. global::log_utils::init_log_with_trace();
  278. let mut ret = get_rest();
  279. let req_data = ret.cancel_orders("DOGEUSDT".to_string()).await;
  280. println!("Bybit--撤銷所有訂單--{:?}", req_data);
  281. }
  282. fn get_ws(btree_map: Option<BybitSwapLogin>, type_v: BybitSwapWsType) -> BybitSwapWs {
  283. let ku_ws = BybitSwapWs::new(false, btree_map, type_v);
  284. ku_ws
  285. }
  286. fn get_rest() -> BybitSwapRest {
  287. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  288. btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
  289. btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
  290. let bybit_exc = BybitSwapRest::new(false, btree_map.clone());
  291. bybit_exc
  292. }