gate_swap_test.rs 8.5 KB


  1. use std::borrow::Cow;
  2. use std::collections::BTreeMap;
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use std::time::Duration;
  6. use chrono::Utc;
  7. use futures_util::StreamExt;
  8. use serde_json::json;
  9. use tokio::sync::Mutex;
  10. use tokio_tungstenite::tungstenite::Message;
  11. use tokio_tungstenite::tungstenite::protocol::CloseFrame;
  12. use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
  13. use tracing::{error, info, trace};
  14. use exchanges::gate_swap_rest::GateSwapRest;
  15. use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
  16. use exchanges::response_base::ResponseData;
  17. use exchanges::socket_tool::{AbstractWsMode, HeartbeatType};
  18. const ACCESS_KEY: &str = "";
  19. const SECRET_KEY: &str = "";
  20. //ws-订阅公共频道信息
  21. #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
  22. async fn ws_custom_subscribe() {
  23. global::log_utils::init_log_with_trace();
  24. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  25. let (_, mut read_rx) = futures_channel::mpsc::unbounded::<ResponseData>();
  26. let write_tx_am = Arc::new(Mutex::new(write_tx));
  27. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  28. //读取
  29. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  30. let _tr = tokio::spawn(async move {
  31. trace!("线程-数据读取-开启");
  32. loop {
  33. // 从通道中接收并丢弃所有的消息,直到通道为空
  34. while let Ok(Some(_)) = read_rx.try_next() {
  35. // 从通道中接收并丢弃所有的消息,直到通道为空
  36. while let Ok(Some(_)) = read_rx.try_next() {
  37. // 消息被忽略
  38. }
  39. }
  40. }
  41. // trace!("线程-数据读取-结束");
  42. });
  43. let param = GateSwapLogin {
  44. api_key: "".to_string(),
  45. secret: "".to_string(),
  46. };
  47. let mut ws = get_ws(Option::from(param));
  48. ws.set_symbols(vec!["BTC_USDT".to_string()]);
  49. ws.set_subscribe(vec![
  50. // GateSwapSubscribeType::PuFuturesTrades,
  51. ]);
  52. //写数据
  53. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  54. // let write_tx_clone = Arc::clone(&write_tx_am);
  55. // let su = ws.get_subscription();
  56. // let tw_restart = tokio::spawn(async move {
  57. // loop {
  58. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  59. // let close_frame = CloseFrame {
  60. // code: CloseCode::Normal,
  61. // reason: Cow::Borrowed("Bye bye"),
  62. // };
  63. // let message = Message::Close(Some(close_frame));
  64. //
  65. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  66. // }
  67. // });
  68. // let write_tx_clone2222 = Arc::clone(&write_tx_am);
  69. // let tw_ping = tokio::spawn(async move {
  70. // trace!("线程-数据写入-开始");
  71. // loop {
  72. // tokio::time::sleep(Duration::from_millis(1 * 1000)).await;
  73. // let timestamp = Utc::now().timestamp();
  74. // let ping_str = json!({
  75. // "time" : timestamp,
  76. // "channel" : "futures.ping",
  77. // });
  78. // write_tx_clone2222.lock().await.unbounded_send(Message::Text(ping_str.to_string())).expect("发送失败1");
  79. // // AbstractWsMode::ping_or_pong(write_tx_clone.clone(), HeartbeatType::Custom(ping_str.to_string()), 1000).await;
  80. // trace!("---------tw_ping");
  81. // }
  82. // trace!("线程-数据写入-结束");
  83. // });
  84. // let write_tx_clone3333333 = Arc::clone(&write_tx_am);
  85. // let tw_close = tokio::spawn(async move {
  86. // loop {
  87. // tokio::time::sleep(Duration::from_millis(3 * 1000)).await;
  88. // write_tx_clone3333333.lock().await.close_channel();
  89. // trace!("---------tw_close");
  90. // }
  91. // });
  92. let fun = move |data: ResponseData| {
  93. async move {
  94. trace!("---传入的方法~~~~{:?}", data);
  95. }
  96. };
  97. let t1 = tokio::spawn(async move {
  98. //链接
  99. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  100. ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  101. trace!("test 唯一线程结束--");
  102. });
  103. tokio::try_join!(t1).unwrap();
  104. trace!("当此结束");
  105. trace!("重启!");
  106. trace!("参考交易所关闭");
  107. return;
  108. }
  109. //ws-订阅公共频道信息
  110. #[tokio::test(flavor = "multi_thread", worker_threads = 3)]
  111. async fn ws_custom_subscribe2() {
  112. global::log_utils::init_log_with_trace();
  113. let (write_tx, mut write_rx) = futures_channel::mpsc::unbounded();
  114. let write_tx_am = Arc::new(Mutex::new(write_tx));
  115. let write_tx_clone = Arc::clone(&write_tx_am);
  116. let write_tx_clone2 = Arc::clone(&write_tx_am);
  117. let tw = tokio::spawn(async move {
  118. loop {
  119. tokio::time::sleep(Duration::from_millis(2 * 1000)).await;
  120. let _ = write_tx_clone.lock().await.unbounded_send(Message::Ping(Vec::from("Ping"))).expect("发送失败!!!");
  121. }
  122. });
  123. let tw2 = tokio::spawn(async move {
  124. loop {
  125. tokio::time::sleep(Duration::from_millis(10 * 1000)).await;
  126. let _ = write_tx_clone2.lock().await.close_channel();
  127. }
  128. });
  129. let t2 = tokio::spawn(async move {
  130. let mut z = 0;
  131. while let Some(message) = write_rx.next().await {
  132. trace!("输出:{:?}",message);
  133. z = z + 1;
  134. if z > 3 {
  135. write_rx.close();
  136. }
  137. }
  138. });
  139. tokio::try_join!(t2,tw).unwrap();
  140. }
  141. //rest-设置持仓模式
  142. #[tokio::test]
  143. async fn rest_cancel_order_all_test() {
  144. global::log_utils::init_log_with_trace();
  145. let mut ret = get_rest();
  146. let req_data = ret.cancel_order_all().await;
  147. println!("okx--设置持仓模式--{:?}", req_data);
  148. }
  149. //rest-下一个自动单
  150. #[tokio::test]
  151. async fn price_order_test() {
  152. global::log_utils::init_log_with_info();
  153. // let mut rest = get_rest();
  154. // let mut params = json!({});
  155. //
  156. // params["initial"] = json!({
  157. // "contract": "XRP_USDT",
  158. // "price": "0",
  159. // "tif": "ioc",
  160. // "reduce_only": true,
  161. // // [平多:close_long, 平空:close_short]
  162. // "auto_size": "close_long"
  163. // });
  164. //
  165. // params["trigger"] = json!({
  166. // // [平多:close-long-position, 平空:close-short-position]
  167. // "order_type": "close-long-position",
  168. // // 一般都默认用0
  169. // "strategy_type": 0,
  170. // // [0 - 最新成交价,1 - 标记价格,2 - 指数价格]
  171. // "price_type": 0,
  172. // // [1: 引用价格大于等于我们传的价格,2:引用价格小于等于我们传的价格]
  173. // // 在止损的情况下:
  174. // // 1 可以理解为向上突破触发价(一般是给空单用)
  175. // // 2 可以理解为向下突破触发价(一般是给多单用)
  176. // "rule": 2,
  177. // // 订单触发价格
  178. // "price": "0.5600",
  179. // });
  180. //
  181. // let response_data = rest.place_price_order("usdt".to_string(), params).await;
  182. // if response_data.code == "200" {
  183. // let response_obj: serde_json::Value = serde_json::from_str(response_data.data.as_str()).unwrap();
  184. //
  185. // info!("resp={:?}", response_obj.as_object().unwrap());
  186. // } else {
  187. // error!(?response_data);
  188. // }
  189. }
  190. #[tokio::test]
  191. async fn price_order_cancel_test() {
  192. global::log_utils::init_log_with_info();
  193. let mut rest = get_rest();
  194. // 这边取消订单只能使用系统返回的
  195. let rst = rest.cancel_price_order("usdt".to_string(), "58002898".to_string()).await;
  196. info!(?rst);
  197. }
  198. //rest-查询合约账户变更历史
  199. #[tokio::test]
  200. async fn rest_account_book_test() {
  201. global::log_utils::init_log_with_trace();
  202. let mut ret = get_rest();
  203. let req_data = ret.account_book("usdt".to_string()).await;
  204. println!("okx--查询合约账户变更历史--{:?}", req_data);
  205. }
  206. fn get_ws(btree_map: Option<GateSwapLogin>) -> GateSwapWs {
  207. let binance_ws = GateSwapWs::new(false,
  208. btree_map,
  209. GateSwapWsType::PublicAndPrivate("usdt".to_string()));
  210. binance_ws
  211. }
  212. fn get_rest() -> GateSwapRest {
  213. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  214. btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
  215. btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
  216. let ba_exc = GateSwapRest::new(false, btree_map);
  217. ba_exc
  218. }