test.rs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. use exchanges::gate_swap_rest::GateSwapRest;
  2. use std::collections::BTreeMap;
  3. use tokio::io::{AsyncReadExt};
  4. use exchanges::kucoin_swap_rest::KucoinSwapRest;
  5. use exchanges::kucoin_swap_ws::{KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
  6. use exchanges::{proxy};
  7. use exchanges::okx_swap_ws::{OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
  8. use std::io::{Read, Write};
  9. use std::sync::Arc;
  10. use std::sync::atomic::AtomicBool;
  11. use tokio::sync::mpsc::channel;
  12. use tokio::try_join;
  13. use tracing::{trace};
  14. use tracing::instrument::WithSubscriber;
  15. use exchanges::binance_swap_rest::BinanceSwapRest;
  16. use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
  17. use exchanges::okx_swap_rest::OkxSwapRest;
  18. #[tokio::test]
  19. async fn test_import() {
  20. global::log_utils::init_log_with_trace();
  21. /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
  22. if proxy::ParsingDetail::http_enable_proxy() {
  23. trace!("检测有代理配置,配置走代理");
  24. }
  25. //获取代理
  26. // demo_get_http_proxy();
  27. //币安---深度socket-公共频道订阅
  28. // demo_pub_ws_ba().await;
  29. // 币安-rest-获取账户信息
  30. // demo_rest_ba().await;
  31. //本次更新成功
  32. //gate-rest -账户信息
  33. // demo_rest_gate().await;
  34. //gate-ws-public-private频道
  35. // demo_ws_gate().await;
  36. //kucoin_rest -账户信息
  37. // demo_rest_kucoin().await;
  38. //Kucoin-ws--公共频道
  39. // demo_ws_kucoin_pu().await;
  40. //Kucoin-ws--私有频道
  41. // demo_ws_kucoin_pr().await;
  42. //okx - Business 频道
  43. // demo_ws_okx_bu().await;
  44. //okx - public 频道
  45. // demo_ws_okx_pu().await;
  46. //okx - rest 频道
  47. // demo_okx_rest().await;
  48. // demo_so();
  49. // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
  50. // KucoinWsType::Private, tx).await;
  51. // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
  52. //
  53. // let t1 = tokio::spawn(async move {
  54. // ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
  55. // });
  56. //
  57. // let t2 = tokio::spawn(async move {
  58. // let mut stdout = std::io::stdout();
  59. // loop {
  60. // if let Ok(received) = rx.try_recv() {
  61. // writeln!(stdout, "age: {:?}", received).unwrap();
  62. // }
  63. // }
  64. // let t01 = tokio::spawn(async move {
  65. // loop {
  66. // tokio::time::sleep(Duration::from_secs(2)).await;
  67. // trace!( "发送-指令: ");
  68. // }
  69. // });
  70. // let t02 = tokio::spawn(async move {
  71. // loop {
  72. // tokio::time::sleep(Duration::from_secs(3)).await;
  73. // trace!( "接收 -指令: ");
  74. // }
  75. // });
  76. // try_join!(t01,t02).unwrap();
  77. // let (mut tx_end, mut rx_end) = channel::<String>(1024);
  78. // let (mut tx_read, mut rx_read) = channel::<String>(1024);
  79. // let mut stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await.unwrap();
  80. // stream.write_all( b"Hello, server!").await.unwrap();
  81. // stream.flush().await.unwrap();
  82. // let mutex_stream = Arc::new(Mutex::new(stream));
  83. //
  84. // //捕捉用户的主动发送的订阅指令
  85. // let stream_clone = Arc::clone(&mutex_stream);
  86. // let t_1 = tokio::spawn(async move {
  87. // loop {
  88. // tokio::time::sleep(Duration::from_secs(1)).await;
  89. // if let Ok(received) = rx_end.try_recv() {
  90. // trace!("动态订阅内容: {:?}", received);
  91. // let mut stream_lock = stream_clone.lock().await;
  92. // // stream_lock.write_all( b"1111!").await.unwrap();
  93. // // stream_lock.flush().await.unwrap();
  94. //
  95. // // stream_lock.write_all(b"Hello, server!").await.unwrap();
  96. // }
  97. // }
  98. // });
  99. //
  100. //
  101. // //socket数据获取,装入回显通道
  102. // let stream_clone = Arc::clone(&mutex_stream);
  103. // let t_2 = tokio::spawn(async move {
  104. // // 创建一个用于存储服务器响应的缓冲区
  105. // let mut buffer = [0; 512];
  106. // loop {
  107. // let mut stream_lock = stream_clone.lock().await;
  108. // tokio::time::sleep(Duration::from_secs(1)).await;
  109. //
  110. // let _ = match stream_lock.read(&mut buffer).await {
  111. // Ok(n) => {
  112. // tokio::time::sleep(Duration::from_secs(1)).await;
  113. // if n == 0 {
  114. // // 没有读取到数据
  115. // trace!("没有数据,主动发送");
  116. // } else {
  117. // // 打印服务器的响应
  118. // trace!("有数据: {}", String::from_utf8_lossy(&buffer[..n]));
  119. // tx_read.send(format!("{}", String::from_utf8_lossy(&buffer[..n]))).await.unwrap()
  120. // }
  121. // }
  122. // Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
  123. // // 如果读取操作会阻塞,则等待一会儿再试
  124. // trace!("Would block, sleeping会阻碍睡眠??");
  125. // tokio::time::sleep(Duration::from_secs(3)).await;
  126. // continue;
  127. // }
  128. // Err(e) => {
  129. // trace!("Err:{:?}",e);
  130. // break;
  131. // }
  132. // };
  133. // }
  134. // });
  135. //
  136. //
  137. // //socket 数据回显
  138. // let t02 = tokio::spawn(async move {
  139. // loop {
  140. // tokio::time::sleep(Duration::from_secs(1)).await;
  141. // // tx.send("hai!!!".to_string()).await.unwrap();
  142. // if let Ok(received) = rx_read.try_recv() {
  143. // trace!("拿到 socket 的数据: {:?}", received);
  144. // }
  145. // }
  146. // });
  147. //
  148. // //模拟用户动态发送
  149. // let t03 = tokio::spawn(async move {
  150. // loop {
  151. // tokio::time::sleep(Duration::from_secs(1)).await;
  152. // tx_end.send("这里是 动态订阅".to_string()).await.unwrap();
  153. // }
  154. // });
  155. //
  156. // try_join!(t_1,t_2,t02,t03).unwrap();
  157. }
  158. async fn demo_okx_rest() {
  159. // let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  160. // btree_map.insert("access_key".to_string(), "".to_string());
  161. // btree_map.insert("secret_key".to_string(), "".to_string());
  162. // btree_map.insert("pass_key".to_string(), "".to_string());
  163. //
  164. // let mut okx_rest = OkxSwapRest::new(false, btree_map);
  165. // let res_data = okx_rest.get_account("BTC".to_string()).await;
  166. // trace!("okx_rest-rest - get_account- {:?}", res_data);
  167. }
  168. async fn demo_ws_gate() {}
  169. fn demo_so() {
  170. // 代理服务器地址和端口
  171. // let proxy_address = "127.0.0.1:7890";
  172. // // 目标服务器地址和端口
  173. // let target_address = "wss://ws.okx.com:8443/ws/v5/public";
  174. //
  175. // // 建立与代理服务器的连接
  176. // let mut proxy_stream = TcpStream::connect(proxy_address).expect("Failed to connect to proxy");
  177. //
  178. // // 发送代理请求
  179. // let request = format!("CONNECT {}\r\n\r\n", target_address);
  180. // proxy_stream.write_all(request.as_bytes()).expect("Failed to send proxy request");
  181. //
  182. // // 读取代理响应
  183. // let mut response = String::new();
  184. // proxy_stream.read_to_string(&mut response).expect("Failed to read proxy response");
  185. //
  186. // // 检查代理响应是否成功
  187. // if !response.contains("200 Connection established") {
  188. // trace!("Proxy connection failed: {}", response);
  189. // }
  190. //
  191. // // 从代理连接中获取原始 TCP 流
  192. // let mut tcp_stream = std::mem::ManuallyDrop::new(proxy_stream);
  193. //
  194. // // 现在你可以使用 `tcp_stream` 来进行与目标服务器的通信
  195. // // 例如,发送和接收数据
  196. // // tcp_stream.write_all(b"Hello, server!").expect("Failed to send data");
  197. //
  198. // thread::spawn(move || {
  199. // loop {
  200. // let mut buffer = [0u8; 1024]; // 用于存储读取的数据的缓冲区
  201. // let bytes_read = tcp_stream.read(&mut buffer).expect("Failed to read data");
  202. //
  203. // // 将读取的数据转换为字符串并打印
  204. // if let Ok(data) = std::str::from_utf8(&buffer[..bytes_read]) {
  205. // trace!("Received data: {}", data);
  206. // } else {
  207. // trace!("Received data contains non-UTF8 characters");
  208. // }
  209. // }
  210. //
  211. // });
  212. //
  213. // // 最后记得手动释放套接字
  214. // // 注意:不要调用 `drop(tcp_stream)`,因为我们使用了 `ManuallyDrop` 来避免自动释放
  215. // unsafe {
  216. // std::mem::ManuallyDrop::drop(&mut tcp_stream);
  217. // }
  218. }
  219. async fn demo_ws_okx_pu() {
  220. // let btree_map: BTreeMap<String, String> = BTreeMap::new();
  221. // let (tx, mut rx) = channel(1024);
  222. // let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx);
  223. // ku_ws.set_subscribe(vec![OkxSubscribeType::PuIndexTickers]);
  224. // let t1 = tokio::spawn(async move {
  225. // ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await;
  226. // });
  227. // let t2 = tokio::spawn(async move {
  228. // let mut stdout = std::io::stdout();
  229. // loop {
  230. // if let Ok(received) = rx.try_recv() {
  231. // writeln!(stdout, "age: {:?}", received).unwrap();
  232. // }
  233. // }
  234. // });
  235. // try_join!(t1,t2).unwrap();
  236. }
  237. async fn demo_ws_okx_bu() {
  238. // let btree_map: BTreeMap<String, String> = BTreeMap::new();
  239. // let (tx, mut rx) = channel(1024);
  240. // let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Business, tx);
  241. // ku_ws.set_subscribe(vec![OkxSubscribeType::BuIndexCandle30m]);
  242. // let t1 = tokio::spawn(async move {
  243. // ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await;
  244. // });
  245. // let t2 = tokio::spawn(async move {
  246. // let mut stdout = std::io::stdout();
  247. // loop {
  248. // if let Ok(received) = rx.try_recv() {
  249. // writeln!(stdout, "age: {:?}", received).unwrap();
  250. // }
  251. // }
  252. // });
  253. // try_join!(t1,t2).unwrap();
  254. }
  255. async fn demo_ws_kucoin_pr() {
  256. // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true));
  257. // let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  258. // btree_map.insert("access_key".to_string(), "".to_string());
  259. // btree_map.insert("secret_key".to_string(), "".to_string());
  260. // btree_map.insert("pass_key".to_string(), "".to_string());
  261. // trace!("----------------------btree_map{:?}", btree_map.clone());
  262. // let (tx, mut rx) = channel(1024);
  263. // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
  264. // KucoinWsType::Private, tx).await;
  265. // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
  266. //
  267. // let t1 = tokio::spawn(async move {
  268. // ku_ws.custom_subscribe(is_shutdown_arc, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
  269. // });
  270. //
  271. // let t2 = tokio::spawn(async move {
  272. // loop {
  273. // if let Ok(received) = rx.try_recv() {
  274. // trace!( "age: {:?}", received);
  275. // }
  276. // }
  277. // });
  278. //
  279. // try_join!(t1,t2).unwrap();
  280. }
  281. async fn demo_ws_kucoin_pu() {
  282. // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true));
  283. // let btree_map: BTreeMap<String, String> = BTreeMap::new();
  284. // let (tx, mut rx) = channel(1024);
  285. // let mut ku_ws = KucoinSwapWs::new(false, btree_map, KucoinWsType::Public, tx).await;
  286. // ku_ws.set_subscribe(vec![
  287. // KucoinSubscribeType::PuContractMarketLevel2Depth50,
  288. // KucoinSubscribeType::PuContractMarkettickerV2,
  289. // ]);
  290. //
  291. // let t1 = tokio::spawn(async move {
  292. // ku_ws.custom_subscribe(is_shutdown_arc, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
  293. // });
  294. // let t2 = tokio::spawn(async move {
  295. // loop {
  296. // if let Ok(received) = rx.try_recv() {
  297. // trace!("age: {:?}", received);
  298. // }
  299. // }
  300. // });
  301. //
  302. // try_join!(t1,t2).unwrap();
  303. }
  304. async fn demo_rest_kucoin() {
  305. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  306. btree_map.insert("access_key".to_string(), "".to_string());
  307. btree_map.insert("secret_key".to_string(), "".to_string());
  308. btree_map.insert("pass_key".to_string(), "".to_string());
  309. let mut kucoin_exc = KucoinSwapRest::new(false, btree_map);
  310. // let res_data = kucoin_exc.get_server_time().await;
  311. // trace!("kucoin_exc-rest - get_server_time- {:?}", res_data);
  312. // trace!("kucoin_exc-rest-get_delays{:?}", kucoin_exc.get_delays() );
  313. // trace!("kucoin_exc-rest -get_avg_delay{:?}", kucoin_exc.get_avg_delay());
  314. // let res_data = kucoin_exc.get_account("USDT".to_string()).await;
  315. // trace!("kucoin_exc-rest - get_account- {:?}", res_data);
  316. // let res_data = kucoin_exc.get_position("XBT1USDM".to_string()).await;
  317. // trace!("kucoin_exc-rest - get_position- {:?}", res_data);
  318. // let res_data = kucoin_exc.get_market_details().await;
  319. // trace!("kucoin_exc-rest - get_market_details- {:?}", res_data);
  320. // let res_data = kucoin_exc.get_ticker("ROSEUSDTM".to_string()).await;
  321. // trace!("kucoin_exc-rest - get_ticker- {:?}", res_data);
  322. let res_data = kucoin_exc.get_orders("".to_string(),
  323. "".to_string()).await;
  324. trace!("kucoin_exc-rest - get_orders- {:?}", res_data);
  325. // let res_data = kucoin_exc.get_positions("USDT".to_string()).await;
  326. // trace!("kucoin_exc-rest - get_positions- {:?}", res_data);
  327. // let res_data = kucoin_exc.get_orders_details("111".to_string(), "".to_string()).await;
  328. // trace!("kucoin_exc-rest - get_orders_details- {:?}", res_data);
  329. // let res_data = kucoin_exc.swap_bazaar_order(
  330. // "cs_202309111808".to_string(),
  331. // "ROSEUSDTM".to_string(),
  332. // "pd".to_string(),
  333. // 1,
  334. // "10".to_string(),
  335. // "0.03856".to_string(),
  336. // "limit".to_string(),
  337. // ).await;
  338. // trace!("kucoin_exc-rest - swap_bazaar_order- {:?}
  339. // let res_data = kucoin_exc.cancel_order("".to_string(), "999999".to_string()).await;
  340. // trace!("kucoin_exc-rest - cancel_order- {:?}", res_data);
  341. // let res_data = kucoin_exc.get_public_token().await;
  342. // trace!("kucoin_exc-rest - get_public_token- {:?}", res_data);
  343. }
  344. async fn demo_rest_gate() {
  345. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  346. btree_map.insert("access_key".to_string(), "".to_string());
  347. btree_map.insert("secret_key".to_string(), "".to_string());
  348. let mut gate_exc = GateSwapRest::new(false, btree_map);
  349. let res_data = gate_exc.get_account("usdt".to_string()).await;
  350. trace!("gate-rest -账户信息{:?}", res_data);
  351. trace!("gate-rest -get_delays{:?}", gate_exc.get_delays() );
  352. trace!("gate-rest -get_avg_delay{:?}", gate_exc.get_avg_delay());
  353. // let res_data = gate_exc.get_position("usdt".to_string(), "CYBER_USDT".to_string()).await;
  354. // trace!("gate-rest -持仓信息{:?}", res_data);
  355. // let res_data = gate_exc.get_ticker("usdt".to_string()).await;
  356. // trace!("gate-rest -ticker{:?}", res_data);
  357. // let res_data = gate_exc.get_server_time().await;
  358. // trace!("gate-rest -get_server_time{:?}", res_data);
  359. // let res_data = gate_exc.get_user_position("usdt".to_string()).await;
  360. // trace!("gate-rest -get_server_time{:?}", res_data);
  361. // let res_data = gate_exc.get_order_details("usdt".to_string(), "11335522".to_string()).await;
  362. // trace!("gate-rest -get_order_details{:?}", res_data);
  363. // let res_data = gate_exc.get_orders("usd1t".to_string(), "open".to_string()).await;
  364. // trace!("gate-rest -get_orders{:?}", res_data);
  365. // let params = serde_json::json!({
  366. // "contract":"CYBER_USDT",
  367. // "size":-1,
  368. // "price":"0",
  369. // "tif":"ioc",
  370. // });
  371. // let res_data = gate_exc.take_order("usdt".to_string(), params).await;
  372. // trace!("gate-rest -get_orders{:?}", res_data);
  373. // let res_data = gate_exc.setting_dual_mode("usdt".to_string(), true).await;
  374. // trace!("gate-rest -setting_dual_mode{:?}", res_data);
  375. // let res_data = gate_exc.setting_dual_leverage("usdt".to_string(),
  376. // "CYBER_USDT".to_string(),
  377. // "20".to_string(),
  378. // ).await;
  379. // trace!("gate-rest -setting_dual_mode{:?}", res_data);
  380. // let res_data = gate_exc.wallet_transfers("usdt".to_string(),
  381. // "CYBER_USDT".to_string(),
  382. // "20".to_string(),
  383. // ).await;
  384. // trace!("gate-rest -setting_dual_mode{:?}", res_data);
  385. // let res_data = gate_exc.cancel_order("usdt".to_string(),
  386. // "12345".to_string(),
  387. // ).await;
  388. // trace!("gate-rest -setting_dual_mode{:?}", res_data);
  389. // let res_data = gate_exc.cancel_orders("usdt".to_string(),
  390. // "CYBER_USDT".to_string(),
  391. // ).await;
  392. // trace!("gate-rest -cancel_orders{:?}", res_data);
  393. // let res_data = gate_exc.order(
  394. // "usdt".to_string(),
  395. // "long".to_string(),
  396. // "buy".to_string(),
  397. // "ROSE_USDT".to_string(),
  398. // 1,
  399. // "0.03888".to_string(),
  400. // "t-my-custom-id-001".to_string(),
  401. // ).await;
  402. // trace!("gate-rest -order{:?}", res_data);
  403. // let res_data = gate_exc.my_trades("usdt".to_string()).await;
  404. // trace!("gate-rest -my_trades{:?}", res_data);
  405. let res_data = gate_exc.account_book("usdt".to_string()).await;
  406. trace!("gate-rest -account_book{:?}", res_data);
  407. }
  408. async fn demo_rest_ba() {
  409. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  410. btree_map.insert("access_key".to_string(), "".to_string());
  411. btree_map.insert("secret_key".to_string(), "".to_string());
  412. // let ba_exc = BinanceUsdtSwapRest::new(false, btree_map);
  413. // let res_data = ba_exc.get_account().await;
  414. let mut ba_exc = BinanceSwapRest::new(false, btree_map);
  415. // let res_data = ba_exc.get_server_time().await;
  416. // trace!("币安-rest - get_server_time{:?}", res_data);
  417. // let res_data = ba_exc.get_exchange_info().await;
  418. // trace!("币安-restget_ get_exchange_info{:?}", res_data);
  419. let res_data = ba_exc.get_account().await;
  420. trace!("币安-rest-获取账户信息{:?}", res_data);
  421. // trace!("币安-rest- -get_delays{:?}", ba_exc.get_delays() );
  422. // trace!("币安-rest--get_avg_delay{:?}", ba_exc.get_avg_delay());
  423. // let res_data = ba_exc.get_order("BTCUSDT".to_string(), 131, "".to_string()).await;
  424. // trace!("币安-rest--get_order{:?}", res_data);
  425. // let res_data = ba_exc.get_open_orders("BTCUSDT".to_string()).await;
  426. // trace!("币安-rest--get_open_orders{:?}", res_data);
  427. // let timestamp_start = chrono::Utc::now().timestamp_millis() - 60 * 100 * 1000;
  428. // let timestamp_end = chrono::Utc::now().timestamp_millis() + 60 * 100 * 1000;
  429. // let res_data = ba_exc.get_all_orders("BTCUSDT".to_string(), 1000, timestamp_start, timestamp_end).await;
  430. // trace!("币安-rest--get_all_orders{:?}", res_data);
  431. // let res_data = ba_exc.get_book_ticker("BTCUSDT".to_string()).await;
  432. // trace!("币安-rest--get_book_ticker{:?}", res_data);
  433. // let res_data = ba_exc.get_position_risk("BTCUS1DT".to_string()).await;
  434. // trace!("币安-rest--get_position_risk{:?}", res_data);
  435. // let res_data = ba_exc.change_pos_side(true).await;
  436. // trace!("币安-rest--change_pos_side{:?}", res_data);
  437. // let res_data = ba_exc.cancel_order("BTCUSDT".to_string(), 3312331, "".to_string()).await;
  438. // trace!("币安-rest--cancel_order{:?}", res_data);
  439. }
  440. async fn demo_pub_ws_ba() {
  441. // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true));
  442. // let btree_map: BTreeMap<String, String> = BTreeMap::new();
  443. // let (tx, mut rx) = channel(1024);
  444. // let mut binance_ws = BinanceSwapWs::new(false, btree_map, BinanceWsType::PublicAndPrivate, tx);
  445. // binance_ws.set_subscribe(vec![
  446. // BinanceSubscribeType::PuAggTrade,
  447. // BinanceSubscribeType::PuDepth20levels100ms,
  448. // BinanceSubscribeType::PuBookTicker,
  449. // ]);
  450. // let t1 = tokio::spawn(async move {
  451. // binance_ws.custom_subscribe(is_shutdown_arc,vec!["BTCUSDT".to_string()]).await;
  452. // });
  453. // let t2 = tokio::spawn(async move {
  454. // loop {
  455. // if let Ok(received) = rx.try_recv() {
  456. // trace!( "age: {:?}", received);
  457. // }
  458. // }
  459. // });
  460. // try_join!(t1,t2).unwrap();
  461. }
  462. fn demo_get_http_proxy() {
  463. //代理地址
  464. let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
  465. trace!("----代理信息:{:?}", parsing_detail);
  466. }
  467. // /*********************web服务*/
  468. // fn demo_http() {
  469. // let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  470. // for stream in listener.incoming() {
  471. // let stream = stream.unwrap();
  472. //
  473. // handle_connection(TcpStream::try_from(stream).unwrap());
  474. // }
  475. // }
  476. //
  477. //
  478. // fn handle_connection(mut stream: TcpStream) {
  479. // let buf_reader = BufReader::new(&mut stream);
  480. // let http_request: Vec<_> = buf_reader
  481. // .lines()
  482. // .map(|result| result.unwrap())
  483. // .take_while(|line| !line.is_empty())
  484. // .collect();
  485. // trace!("Request: {:#?}", http_request);
  486. // trace!("Request2: {:#?}", http_request[0]);
  487. // trace!("Request3: {:#?}", http_request[1]);
  488. //
  489. // let (status_line, filename) = if http_request[0] == "GET / HTTP/1.1" {
  490. // ("HTTP/1.1 200 OK", "hello.html")
  491. // } else {
  492. // ("HTTP/1.1 404 NOT FOUND", "404.html")
  493. // };
  494. //
  495. // let status_line = "HTTP/1.1 200 OK";
  496. // let contents = fs::read_to_string("src/404.html").unwrap();
  497. // let length = contents.len();
  498. //
  499. // let response =
  500. // format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
  501. // // let response = "HTTP/1.1 200 OK\r\n\r\nccccc";
  502. //
  503. // stream.write_all(response.as_bytes()).unwrap();
  504. // }