okx_swap_test.rs 13 KB


  1. use std::collections::BTreeMap;
  2. use std::sync::Arc;
  3. use std::sync::atomic::AtomicBool;
  4. use futures_util::StreamExt;
  5. use tokio::sync::Mutex;
  6. use tracing::trace;
  7. use exchanges::okx_swap_rest::OkxSwapRest;
  8. use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  9. const ACCESS_KEY: &str = "";
  10. const SECRET_KEY: &str = "";
  11. const PASS_KEY: &str = "";
  12. //ws-订阅公共频道信息
  13. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  14. async fn ws_custom_subscribe_pu() {
  15. global::log_utils::init_log_with_trace();
  16. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  17. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  18. let mut ws = get_ws(None, OkxSwapWsType::Public).await;
  19. ws.set_symbols(vec!["BTC_USDT".to_string()]);
  20. ws.set_subscribe(vec![
  21. // OkxSwapSubscribeType::PuBooks5,
  22. // OkxSwapSubscribeType::Putrades,
  23. OkxSwapSubscribeType::PuBooks50L2tbt,
  24. // OkxSwapSubscribeType::PuIndexTickers,
  25. ]);
  26. let write_tx_am = Arc::new(Mutex::new(write_tx));
  27. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  28. //读取
  29. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  30. let _tr = tokio::spawn(async move {
  31. trace!("线程-数据读取-开启");
  32. loop {
  33. if let Some(data) = read_rx.next().await {
  34. trace!("读取数据data:{:?}",data)
  35. }
  36. }
  37. // trace!("线程-数据读取-结束");
  38. });
  39. //写数据
  40. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  41. // let write_tx_clone = Arc::clone(&write_tx_am);
  42. // let su = ws.get_subscription();
  43. // let tw = tokio::spawn(async move {
  44. // trace!("线程-数据写入-开始");
  45. // loop {
  46. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  47. // // let close_frame = CloseFrame {
  48. // // code: CloseCode::Normal,
  49. // // reason: Cow::Borrowed("Bye bye"),
  50. // // };
  51. // // let message = Message::Close(Some(close_frame));
  52. //
  53. //
  54. // let message = Message::Text(su.clone());
  55. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  56. // trace!("发送指令成功");
  57. // }
  58. // trace!("线程-数据写入-结束");
  59. // });
  60. let t1 = tokio::spawn(async move {
  61. //链接
  62. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  63. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  64. trace!("test 唯一线程结束--");
  65. });
  66. tokio::try_join!(t1).unwrap();
  67. trace!("当此结束");
  68. trace!("重启!");
  69. trace!("参考交易所关闭");
  70. return;
  71. }
  72. //ws-订阅私有频道信息
  73. #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
  74. async fn ws_custom_subscribe_bu() {
  75. global::log_utils::init_log_with_trace();
  76. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  77. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  78. let mut ws = get_ws(None, OkxSwapWsType::Business).await;
  79. ws.set_symbols(vec!["BTC-USD".to_string()]);
  80. ws.set_subscribe(vec![
  81. OkxSwapSubscribeType::BuIndexCandle30m,
  82. ]);
  83. let write_tx_am = Arc::new(Mutex::new(write_tx));
  84. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  85. //读取
  86. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  87. let _tr = tokio::spawn(async move {
  88. trace!("线程-数据读取-开启");
  89. loop {
  90. if let Some(data) = read_rx.next().await {
  91. trace!("读取数据data:{:?}",data)
  92. }
  93. }
  94. // trace!("线程-数据读取-结束");
  95. });
  96. //写数据
  97. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  98. // let write_tx_clone = Arc::clone(&write_tx_am);
  99. // let su = ws.get_subscription();
  100. // let tw = tokio::spawn(async move {
  101. // trace!("线程-数据写入-开始");
  102. // loop {
  103. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  104. // // let close_frame = CloseFrame {
  105. // // code: CloseCode::Normal,
  106. // // reason: Cow::Borrowed("Bye bye"),
  107. // // };
  108. // // let message = Message::Close(Some(close_frame));
  109. //
  110. //
  111. // let message = Message::Text(su.clone());
  112. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  113. // trace!("发送指令成功");
  114. // }
  115. // trace!("线程-数据写入-结束");
  116. // });
  117. let t1 = tokio::spawn(async move {
  118. //链接
  119. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  120. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  121. trace!("test 唯一线程结束--");
  122. });
  123. tokio::try_join!(t1).unwrap();
  124. trace!("当此结束");
  125. trace!("重启!");
  126. trace!("参考交易所关闭");
  127. return;
  128. }
  129. //ws-订阅私有频道信息
  130. #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
  131. async fn ws_custom_subscribe_pr() {
  132. global::log_utils::init_log_with_trace();
  133. let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
  134. let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
  135. let btree_map = OkxSwapLogin {
  136. api_key: ACCESS_KEY.to_string(),
  137. secret_key: SECRET_KEY.to_string(),
  138. passphrase: PASS_KEY.to_string(),
  139. };
  140. let mut ws = get_ws(Option::from(btree_map), OkxSwapWsType::Private).await;
  141. ws.set_symbols(vec!["BTC-USDT".to_string()]);
  142. ws.set_subscribe(vec![
  143. OkxSwapSubscribeType::PrAccount("USDT".to_string()),
  144. OkxSwapSubscribeType::PrOrders,
  145. OkxSwapSubscribeType::PrPositions,
  146. OkxSwapSubscribeType::PrBalanceAndPosition,
  147. ]);
  148. let write_tx_am = Arc::new(Mutex::new(write_tx));
  149. let is_shutdown_arc = Arc::new(AtomicBool::new(true));
  150. //读取
  151. let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
  152. let _tr = tokio::spawn(async move {
  153. trace!("线程-数据读取-开启");
  154. loop {
  155. if let Some(data) = read_rx.next().await {
  156. trace!("读取数据data:{:?}",data)
  157. }
  158. }
  159. // trace!("线程-数据读取-结束");
  160. });
  161. //写数据
  162. // let bool_v2_clone = Arc::clone(&is_shutdown_arc);
  163. // let write_tx_clone = Arc::clone(&write_tx_am);
  164. // let su = ws.get_subscription();
  165. // let tw = tokio::spawn(async move {
  166. // trace!("线程-数据写入-开始");
  167. // loop {
  168. // tokio::time::sleep(Duration::from_millis(20 * 1000)).await;
  169. // // let close_frame = CloseFrame {
  170. // // code: CloseCode::Normal,
  171. // // reason: Cow::Borrowed("Bye bye"),
  172. // // };
  173. // // let message = Message::Close(Some(close_frame));
  174. //
  175. //
  176. // let message = Message::Text(su.clone());
  177. // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await;
  178. // trace!("发送指令成功");
  179. // }
  180. // trace!("线程-数据写入-结束");
  181. // });
  182. let t1 = tokio::spawn(async move {
  183. //链接
  184. let bool_v3_clone = Arc::clone(&is_shutdown_arc);
  185. ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  186. trace!("test 唯一线程结束--");
  187. });
  188. tokio::try_join!(t1).unwrap();
  189. trace!("当此结束");
  190. trace!("重启!");
  191. trace!("参考交易所关闭");
  192. return;
  193. //
  194. // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true));
  195. // let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  196. //
  197. // btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
  198. // btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
  199. // btree_map.insert("pass_key".to_string(), PASS_KEY.to_string());
  200. //
  201. // let (tx, mut rx) = channel(1024);
  202. // let mut ws = get_ws(btree_map, OkxWsType::Private, tx).await;
  203. // ws.set_subscribe(vec![
  204. // OkxSubscribeType::PrBalanceAndPosition,
  205. // // OkxSubscribeType::PrAccount("USDT".to_string()),
  206. // OkxSubscribeType::PrOrders,
  207. // OkxSubscribeType::PrPositions,
  208. // ]);
  209. //
  210. // let t1 = tokio::spawn(async move {
  211. // ws.custom_subscribe(is_shutdown_arc, vec!["BTC-USDT".to_string()]).await;
  212. // });
  213. //
  214. // let t2 = tokio::spawn(async move {
  215. // loop {
  216. // if let Ok(received) = rx.try_recv() {
  217. // trace!( "age: {:?}", received);
  218. // }
  219. // }
  220. // });
  221. // try_join!(t1,t2).unwrap();
  222. }
  223. //rest-订单查询
  224. #[tokio::test]
  225. async fn rest_get_order_test() {
  226. global::log_utils::init_log_with_trace();
  227. let mut ret = get_rest();
  228. let req_data = ret.get_order("BTC-USDT".to_string(), "3333".to_string(), "".to_string()).await;
  229. println!("okx--订单查询--{:?}", req_data);
  230. }
  231. //rest-未完成的订单
  232. #[tokio::test]
  233. async fn rest_get_incomplete_order_test() {
  234. global::log_utils::init_log_with_trace();
  235. let mut ret = get_rest();
  236. let req_data = ret.get_incomplete_order("BTC-USDT".to_string()).await;
  237. println!("okx--未完成的订单--{:?}", req_data);
  238. }
  239. //rest-获取系统时间
  240. #[tokio::test]
  241. async fn rest_get_server_time_test() {
  242. global::log_utils::init_log_with_trace();
  243. let mut ret = get_rest();
  244. let req_data = ret.get_server_time().await;
  245. println!("okx--获取系统时间--{:?}", req_data);
  246. }
  247. //rest-查看持仓信息
  248. #[tokio::test]
  249. async fn rest_get_positions_test() {
  250. global::log_utils::init_log_with_trace();
  251. let mut ret = get_rest();
  252. let req_data = ret.get_positions("SWA1P".to_string()).await;
  253. println!("okx--查看持仓信息--{:?}", req_data);
  254. }
  255. //rest-获取单个产品行情信息
  256. #[tokio::test]
  257. async fn rest_get_ticker_test() {
  258. global::log_utils::init_log_with_trace();
  259. let mut ret = get_rest();
  260. let req_data = ret.get_ticker("BTC-USDT".to_string()).await;
  261. println!("okx--获取单个产品行情信息--{:?}", req_data);
  262. }
  263. //rest-查看账户余额
  264. #[tokio::test]
  265. async fn rest_get_balance_test() {
  266. global::log_utils::init_log_with_trace();
  267. let mut ret = get_rest();
  268. let req_data = ret.get_balance("BTC,ETH".to_string()).await;
  269. println!("okx--查看账户余额--{:?}", req_data);
  270. }
  271. //rest-获取交易产品基础信息
  272. #[tokio::test]
  273. async fn rest_get_instruments_test() {
  274. global::log_utils::init_log_with_trace();
  275. let mut ret = get_rest();
  276. let req_data = ret.get_instruments().await;
  277. println!("okx--获取交易产品基础信息--{:?}", req_data);
  278. }
  279. //rest-获取成交明细(近三天)
  280. #[tokio::test]
  281. async fn rest_get_trade_fills_test() {
  282. global::log_utils::init_log_with_trace();
  283. let mut ret = get_rest();
  284. let req_data = ret.get_trade_fills("".to_string()).await;
  285. println!("okx--获取成交明细(近三天)--{:?}", req_data);
  286. }
  287. //rest-撤单
  288. #[tokio::test]
  289. async fn rest_cancel_order_test() {
  290. global::log_utils::init_log_with_trace();
  291. let mut ret = get_rest();
  292. let req_data = ret.cancel_order("BTC-USD".to_string(), "1111".to_string(), "".to_string()).await;
  293. println!("okx--撤单--{:?}", req_data);
  294. }
  295. //rest-设置杠杆倍数
  296. #[tokio::test]
  297. async fn rest_set_leverage_test() {
  298. global::log_utils::init_log_with_trace();
  299. let mut ret = get_rest();
  300. let req_data = ret.set_leverage("BTC-USDT".to_string(), "5".to_string()).await;
  301. println!("okx--设置杠杆倍数--{:?}", req_data);
  302. }
  303. //rest-设置持仓模式
  304. #[tokio::test]
  305. async fn rest_set_position_mode_test() {
  306. global::log_utils::init_log_with_trace();
  307. let mut ret = get_rest();
  308. let req_data = ret.set_position_mode().await;
  309. println!("okx--设置持仓模式--{:?}", req_data);
  310. }
  311. //rest-获取历史订单记录(近七天)
  312. #[tokio::test]
  313. async fn rest_get_orders_history_test() {
  314. global::log_utils::init_log_with_trace();
  315. let mut ret = get_rest();
  316. let req_data = ret.get_orders_history("".to_string(),
  317. "".to_string(),
  318. "filled".to_string(),
  319. "".to_string(),
  320. "".to_string(),
  321. "".to_string(),
  322. ).await;
  323. println!("okx--获取历史订单记录--{:?}", req_data);
  324. }
  325. //rest-获取历史成交数据(近七天)
  326. #[tokio::test]
  327. async fn rest_get_trades_test() {
  328. global::log_utils::init_log_with_trace();
  329. let mut ret = get_rest();
  330. let req_data = ret.get_trades("".to_string(),
  331. "".to_string(),
  332. "".to_string(),
  333. "".to_string(),
  334. "".to_string(),
  335. "100".to_string(),
  336. ).await;
  337. println!("okx--获取历史成交数据--{:?}", req_data);
  338. }
  339. async fn get_ws(btree_map: Option<OkxSwapLogin>, type_v: OkxSwapWsType) -> OkxSwapWs {
  340. let ku_ws = OkxSwapWs::new(false, btree_map, type_v);
  341. ku_ws
  342. }
  343. fn get_rest() -> OkxSwapRest {
  344. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  345. btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string());
  346. btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string());
  347. btree_map.insert("pass_key".to_string(), PASS_KEY.to_string());
  348. let okx_exc = OkxSwapRest::new(false, btree_map.clone());
  349. okx_exc
  350. }