exchange_libs.rs 39 KB


  1. use std::collections::{BTreeMap, HashMap};
  2. use std::{env, io, thread};
  3. use std::error::Error;
  4. use std::future::Future;
  5. use std::io::{Write};
  6. use std::net::{IpAddr, Ipv4Addr, SocketAddr};
  7. use std::str::FromStr;
  8. use std::sync::Arc;
  9. use tokio::sync::Mutex;
  10. use std::time::Duration;
  11. use chrono::Utc;
  12. use reqwest;
  13. use reqwest::header::{HeaderMap, HeaderValue};
  14. use hex;
  15. use ring::hmac;
  16. use serde_json::{json, Map, Value};
  17. use tungstenite::client::{connect_with_proxy};
  18. use tungstenite::{connect, Message};
  19. use tungstenite::protocol::WebSocketConfig;
  20. use url::Url;
  21. #[cfg(test)]
  22. mod tests {
  23. use crate::exchange_libs::{BinanceExc, http_enable_proxy, OkxExc, ResponseData};
  24. #[tokio::test]//测试获取ok 账号信息
  25. async fn test_okx_get_acc() {
  26. //开启代理
  27. http_enable_proxy();
  28. let okx_exc = OkxExc::new(
  29. "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
  30. "556DAB6773CA26DDAAA114F7044138CA".to_string(),
  31. "rust_Test123".to_string(),
  32. );
  33. let req: ResponseData = okx_exc.okx_acc("USDT").await;
  34. print!("---响应:code:{}", req.code);
  35. print!("---响应:mes:{}", req.message);
  36. print!("---响应:data:{}", req.data);
  37. }
  38. #[tokio::test]//测试okx 下单
  39. async fn test_okx_order() {
  40. //开启代理
  41. http_enable_proxy();
  42. let okx_exc = OkxExc::new(
  43. "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
  44. "556DAB6773CA26DDAAA114F7044138CA".to_string(),
  45. "rust_Test123".to_string(),
  46. );
  47. let req: ResponseData = okx_exc.okx_order("CORE-USDT", "cash", "buy", "limit", "0.8555", "1").await;
  48. print!("---响应:code:{}", req.code);
  49. print!("---响应:mes:{}", req.message);
  50. print!("---响应:data:{}", req.data);
  51. }
  52. #[tokio::test]//测试okx获取订单信息
  53. async fn test_okx_get_order() {
  54. //开启代理
  55. http_enable_proxy();
  56. let okx_exc = OkxExc::new(
  57. "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
  58. "556DAB6773CA26DDAAA114F7044138CA".to_string(),
  59. "rust_Test123".to_string(),
  60. );
  61. let req2: ResponseData = okx_exc.okx_get_order("CORE-USDT", "611949361383612427").await;
  62. print!("---响应:code:{}", req2.code);
  63. print!("---响应:mes:{}", req2.message);
  64. print!("---响应:data:{}", req2.data);
  65. }
  66. #[tokio::test]//测试okx获取订单信息
  67. async fn test_okx_revocation_order() {
  68. //开启代理
  69. http_enable_proxy();
  70. let okx_exc = OkxExc::new(
  71. "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
  72. "556DAB6773CA26DDAAA114F7044138CA".to_string(),
  73. "rust_Test123".to_string(),
  74. );
  75. let req3: ResponseData = okx_exc.okx_revocation_order("CORE-USDT", "611950727669751811").await;
  76. print!("---响应:code:{}", req3.code);
  77. print!("---响应:mes:{}", req3.message);
  78. print!("---响应:data:{}", req3.data);
  79. }
  80. #[tokio::test]//测试币安-获取K
  81. async fn test_binance_k() {
  82. http_enable_proxy();
  83. let binance_exc = BinanceExc::new(
  84. "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
  85. "556DAB6773CA26DDAAA114F7044138CA".to_string(),
  86. );
  87. let req3: ResponseData = binance_exc.binance_k("BTCUSDT", "5m", "20").await;
  88. print!("---响应:code:{}", req3.code);
  89. print!("---响应:mes:{}", req3.message);
  90. print!("---响应:data:{}", req3.data);
  91. }
  92. #[tokio::test]//测试币安-深度信息
  93. async fn test_binance_depth() {
  94. http_enable_proxy();
  95. let binance_exc = BinanceExc::new(
  96. "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
  97. "556DAB6773CA26DDAAA114F7044138CA".to_string(),
  98. );
  99. let req3: ResponseData = binance_exc.binance_depth("BTCUSDT", "20").await;
  100. //println!("---响应:code:{}", req3.code);
  101. //println!("---响应:mes:{}", req3.message);
  102. //println!("---响应:data:{}", req3.data);
  103. }
  104. }
  105. pub struct BinanceExc {
  106. base_url: String,
  107. access_key: String,
  108. secret_key: String,
  109. }
  110. impl BinanceExc {
  111. pub fn new(access_key: String, secret_key: String) -> BinanceExc {
  112. BinanceExc { base_url: "https://api.binance.com".to_string(), access_key, secret_key }
  113. }
  114. //币安-深度信息
  115. pub async fn binance_depth(&self, symbol: &str, limit: &str) -> ResponseData {
  116. let base_url = "/api/v3/depth?symbol=".to_string() + &symbol + "&limit=" + &limit;
  117. let result = self.get(base_url.to_string()).await;
  118. match result {
  119. Ok(req_data) => {
  120. req_data
  121. }
  122. Err(err) => {
  123. let error = ResponseData::error(format!("json 解析失败:{}", err));
  124. error
  125. }
  126. }
  127. }
  128. //币安-k线
  129. pub async fn binance_k(&self, symbol: &str, interval: &str, limit: &str) -> ResponseData {
  130. let base_url = "/api/v3/klines?symbol=".to_string() + &symbol + "&interval=" + &interval + "&limit=" + &limit;
  131. let result = self.get(base_url.to_string()).await;
  132. match result {
  133. Ok(req_data) => {
  134. req_data
  135. }
  136. Err(err) => {
  137. let error = ResponseData::error(format!("json 解析失败:{}", err));
  138. error
  139. }
  140. }
  141. }
  142. //普通get
  143. async fn get(&self, request_path: String) -> Result<(ResponseData), reqwest::Error> {
  144. let mut req_data;
  145. let base_url = self.base_url.clone();
  146. // 发起GET请求
  147. let url = format!("{}{}", base_url, request_path);
  148. let response = reqwest::get(url)
  149. .await?; // 发起请求并等待结果
  150. //println!("-------");
  151. // 检查响应是否成功
  152. if response.status().is_success() {
  153. // 读取响应的内容
  154. let body = response.text().await?;
  155. //println!("Response body:\n{}", body);
  156. req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
  157. } else {
  158. let body = response.text().await?;
  159. //println!("Request failed with status: {}", body);
  160. req_data = ResponseData::error(body.to_string())
  161. }
  162. Ok((req_data))
  163. }
  164. }
  165. //okx 的结构体,以及相关方法
  166. pub struct OkxExc {
  167. base_url: String,
  168. access_key: String,
  169. secret_key: String,
  170. passphrase: String,
  171. }
  172. impl OkxExc {
  173. pub fn new(access_key: String, secret_key: String, passphrase: String) -> OkxExc {
  174. OkxExc {
  175. base_url: "https://www.okx.com".to_string(),
  176. access_key: access_key,
  177. secret_key: secret_key,
  178. passphrase: passphrase,
  179. }
  180. }
  181. //获取订单信息
  182. pub async fn okx_get_order(&self, inst_id: &str, ord_id: &str) -> ResponseData {
  183. let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
  184. btree_map.insert("instId", inst_id);//产品Id
  185. btree_map.insert("ordId", ord_id);//顶顶那
  186. let result = self.get_v(
  187. "/api/v5/trade/order".to_string(),
  188. btree_map,
  189. ).await;
  190. self.req_data_analysis(result)
  191. }
  192. //撤单接口
  193. pub async fn okx_revocation_order(&self, inst_id: &str, ord_id: &str) -> ResponseData {
  194. let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
  195. btree_map.insert("instId", inst_id);//产品Id
  196. btree_map.insert("ordId", ord_id);//顶顶那
  197. let result = self.post_v(
  198. "/api/v5/trade/cancel-order".to_string(),
  199. btree_map,
  200. ).await;
  201. self.req_data_analysis(result)
  202. }
  203. //下单接口
  204. pub async fn okx_order(&self, inst_id: &str, td_mode: &str, side: &str, ord_type: &str, px: &str, sz: &str) -> ResponseData {
  205. let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
  206. btree_map.insert("instId", inst_id);//产品Id
  207. btree_map.insert("tdMode", td_mode);//交易模式
  208. btree_map.insert("side", side);//订单方向
  209. btree_map.insert("ordType", ord_type);//订单类
  210. btree_map.insert("px", px);//委托价格
  211. btree_map.insert("sz", sz);//委托数量
  212. let result = self.post_v(
  213. "/api/v5/trade/order".to_string(),
  214. btree_map,
  215. ).await;
  216. self.req_data_analysis(result)
  217. }
  218. //账户信息
  219. pub async fn okx_acc(&self, ccy: &str) -> ResponseData {
  220. let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
  221. btree_map.insert("ccy", ccy);
  222. let result = self.get_v(
  223. "/api/v5/account/balance".to_string(),
  224. btree_map,
  225. ).await;
  226. self.req_data_analysis(result)
  227. }
  228. //带认证-get
  229. pub(crate) async fn get_v(&self, request_path: String, params: BTreeMap<&str, &str>) -> Result<(ResponseData), reqwest::Error> {
  230. let mut req_data: ResponseData;
  231. /*请求接口与 地址*/
  232. let base_url = self.base_url.to_string();
  233. /*账号 密钥 密码*/
  234. let access_key = self.access_key.clone().to_string();
  235. let secret_key = self.secret_key.clone().to_string();
  236. let passphrase = self.passphrase.clone().to_string();
  237. /*签名生成*/
  238. let timestamp = get_timestamp();
  239. let params_str = parse_params_to_str(params);
  240. let get_url_params = format!("{}?{}", request_path, params_str); //接口与参数组装
  241. // 时间戳 + 请求类型+ 请求参数字符串
  242. let message = format!("{}GET{}", timestamp, get_url_params);
  243. // //println!("---message:{:?}", message);
  244. let sign = self.okx_sign(secret_key, message);
  245. //添加请求头
  246. let headers = self.okx_create_header(&access_key, &passphrase, &sign, &timestamp);
  247. let client = reqwest::Client::new();
  248. let req = client.get(base_url + &get_url_params)
  249. .headers(headers);
  250. //println!("--请求头:{:?}", req);
  251. //拿到返回
  252. let response = req.send()
  253. .await?;
  254. // 检查响应是否成功
  255. //println!("---状态:{:?},{}", response.status(), response.status().is_success());
  256. if response.status().is_success() {
  257. // 读取响应的内容
  258. let body = response.text().await?;
  259. //println!("okx_acc-Response body:\n{}", body);
  260. req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
  261. } else {
  262. let body = response.text().await?;
  263. //println!("okx_acc-Request failed with status: {}", body);
  264. req_data = ResponseData::error(body.to_string())
  265. }
  266. Ok((req_data))
  267. }
  268. //普通-get
  269. async fn get_v_n(&self, request_path: String) -> Result<(ResponseData), reqwest::Error> {
  270. let mut req_data;
  271. let base_url = self.base_url.clone();
  272. // 发起GET请求
  273. let url = format!("{}{}", base_url, request_path);
  274. let response = reqwest::get(url)
  275. .await?; // 发起请求并等待结果
  276. //println!("-------");
  277. // 检查响应是否成功
  278. if response.status().is_success() {
  279. // 读取响应的内容
  280. let body = response.text().await?;
  281. //println!("Response body:\n{}", body);
  282. req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
  283. } else {
  284. let body = response.text().await?;
  285. //println!("Request failed with status: {}", body);
  286. req_data = ResponseData::error(body.to_string())
  287. }
  288. Ok((req_data))
  289. }
  290. //带认证-post
  291. async fn post_v(&self, request_path: String, params: BTreeMap<&str, &str>) -> Result<(ResponseData), reqwest::Error> {
  292. let mut req_data: ResponseData;
  293. /*请求接口与 地址*/
  294. let base_url = self.base_url.to_string();
  295. /*账号 密钥 密码*/
  296. let access_key = self.access_key.to_string();
  297. let secret_key = self.secret_key.to_string();
  298. let passphrase = self.passphrase.to_string();
  299. /*签名生成*/
  300. let timestamp = get_timestamp();
  301. let params_str = serde_json::to_string(&params).unwrap();
  302. // let params_json = serde_json::to_value(params_str.clone()).unwrap();
  303. //println!("---params:{:?}", params);
  304. //println!("---params-json_str:{ }", params_str.clone());
  305. // //println!("---params-json:{:?}", params_json);
  306. // 时间戳 + 请求类型+ 请求参数字符串
  307. let message = format!("{}POST{}{}", timestamp, request_path, &params_str);
  308. //println!("---message:{:?}", message);
  309. let sign = self.okx_sign(secret_key, message);
  310. //添加请求头
  311. let headers = self.okx_create_header(&access_key, &passphrase, &sign, &timestamp);
  312. let client = reqwest::Client::new();
  313. let url = format!("{}{}", base_url, request_path);
  314. let req = client
  315. .post(url)
  316. .headers(headers)
  317. .json(&params)
  318. ;
  319. let response = req.send()
  320. .await?;
  321. // 检查响应是否成功
  322. //println!("---状态:{:?},{}", response.status(), response.status().is_success());
  323. if response.status().is_success() {
  324. // 读取响应的内容
  325. let body = response.text().await?;
  326. //println!("okx_order-Response body:\n{}", body);
  327. req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
  328. } else {
  329. let body = response.text().await?;
  330. //println!("okx_order-Request failed with status: {}", body);
  331. req_data = ResponseData::error(body.to_string())
  332. }
  333. Ok((req_data))
  334. }
  335. //okx 签名生成
  336. fn okx_sign(&self, secret_key: String, message: String) -> String {
  337. // 做签名
  338. let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  339. let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
  340. base64::encode(result)
  341. }
  342. //生成hdear
  343. fn okx_create_header(&self, api_key: &str, passphrase: &str, sign: &str, timestamp: &str) -> HeaderMap {
  344. // 处理请求头 headers
  345. let mut header_map = HeaderMap::new();
  346. header_map.insert(
  347. "OK-ACCESS-KEY",
  348. HeaderValue::from_str(&api_key).unwrap(),
  349. );
  350. header_map.insert(
  351. "OK-ACCESS-SIGN",
  352. HeaderValue::from_str(&sign).unwrap());
  353. header_map.insert(
  354. "OK-ACCESS-TIMESTAMP",
  355. HeaderValue::from_str(&timestamp).unwrap(),
  356. );
  357. header_map.insert(
  358. "OK-ACCESS-PASSPHRASE",
  359. HeaderValue::from_str(&passphrase).unwrap(),
  360. );
  361. header_map.insert(
  362. "CONTENT-TYPE",
  363. HeaderValue::from_static("application/json; charset=UTF-8"),
  364. );
  365. // header_map.insert(
  366. // reqwest::header::CONTENT_TYPE,
  367. // HeaderValue::from_static("application/json; charset=UTF-8"),
  368. // );
  369. header_map
  370. }
  371. //req_data 解析
  372. fn req_data_analysis(&self, result: Result<ResponseData, reqwest::Error>) -> ResponseData {
  373. match result {
  374. Ok(req_data) => {
  375. if req_data.code != "0" {
  376. req_data
  377. } else {
  378. let body: String = req_data.data;
  379. let json_value: serde_json::Value = serde_json::from_str(&body).unwrap();
  380. let code = json_value["code"].as_str().unwrap();
  381. let data = serde_json::to_string(&json_value["data"]).unwrap();
  382. let msg = json_value["msg"].as_str().unwrap();
  383. // //println!("--解析成功----code:{}",code);
  384. // //println!("--解析成功----data:{}",data);
  385. // //println!("--解析成功----msg:{}",msg);
  386. let success = ResponseData::new(code.parse().unwrap(),
  387. msg.parse().unwrap(),
  388. data.parse().unwrap());
  389. success
  390. }
  391. }
  392. Err(err) => {
  393. let error = ResponseData::error(format!("json 解析失败:{}", err));
  394. error
  395. }
  396. }
  397. }
  398. }
  399. pub struct SocketTool {
  400. //连接地址
  401. request_url: String,
  402. //ip
  403. ip: String,
  404. //ip
  405. port: u16,
  406. //是否需要登陆
  407. is_login: bool,
  408. //登陆所需参数
  409. login_param: BTreeMap<String, String>,
  410. //订阅参数
  411. subscription: serde_json::Value,
  412. }
  413. impl SocketTool {
  414. pub fn new(request_url: &str,
  415. is_login: bool,
  416. login_param: BTreeMap<String, String>,
  417. subscription: serde_json::Value, ) -> SocketTool
  418. {
  419. let mut ip_v = "";
  420. let mut port_v = 8080;
  421. let mut v_str = String::from("");
  422. /*******读取环境变量-判定初始化代理地址*******/
  423. let parsing_detail = parsing_environment_variables();
  424. if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
  425. ip_v = parsing_detail.ip_address.as_str();
  426. port_v = parsing_detail.port.parse().unwrap();
  427. }
  428. /*****返回结构体*******/
  429. SocketTool {
  430. request_url: request_url.to_string(),
  431. ip: ip_v.clone().to_string(),
  432. port: port_v,
  433. is_login: is_login,
  434. login_param: login_param,
  435. subscription: subscription,
  436. }
  437. }
  438. pub(crate) fn run<F, Fut>(&self, parse_fn: F)
  439. where
  440. F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
  441. Fut: Future<Output=()> + Send + 'static,
  442. {
  443. /*****消息溜***/
  444. let mut stdout = io::stdout();
  445. let mut stderr = io::stderr();
  446. /*****socket配置信息***/
  447. let request_url = Url::parse(self.request_url.as_str()).unwrap();
  448. let parse_fn = Arc::new(Mutex::new(parse_fn)); // Wrap the closure in an Arc<Mutex<_>>
  449. let login_param = self.login_param.clone();
  450. let lable = login_param.get("lable").unwrap().clone();
  451. /*****判断代理IP是否为空,空则不走代理*****/
  452. if self.ip.len() > 0 {
  453. //println!("----socket-走代理");
  454. let ip_array: Vec<&str> = self.ip.split(".").collect();
  455. let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
  456. ip_array[0].parse().unwrap(),
  457. ip_array[1].parse().unwrap(),
  458. ip_array[2].parse().unwrap(),
  459. ip_array[3].parse().unwrap())
  460. ), self.port);
  461. let websocket_config = Some(WebSocketConfig {
  462. max_send_queue: Some(16),
  463. max_message_size: Some(16 * 1024 * 1024),
  464. max_frame_size: Some(16 * 1024 * 1024),
  465. accept_unmasked_frames: false,
  466. });
  467. let max_redirects = 5;
  468. let (mut socket, response) =
  469. connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects)
  470. .expect("Can't connect(无法连接)");
  471. /******登陆认证********/
  472. if self.is_login {
  473. //println!("----需要登陆");
  474. let login_json_str = SocketTool::log_in_to_str(login_param);
  475. // //println!("---组装 登陆信息:{0}", login_json_str);
  476. socket.write_message(Message::Text(login_json_str)).unwrap();
  477. thread::sleep(Duration::from_secs(1));
  478. } else {
  479. //println!("----no longin(不需要登陆)");
  480. }
  481. /******订阅信息********/
  482. let sub_json = self.subscription.clone();
  483. //println!("--订阅内容:{:?}", sub_json);
  484. let sub_json_str = sub_json.to_string();
  485. // writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
  486. socket.write_message(Message::Text(sub_json_str))
  487. .unwrap();
  488. /******数据读取********/
  489. let mut okx_ping_time = Utc::now().timestamp();
  490. let mut okx_ping_off = false;
  491. loop {
  492. // if !socket.can_read() {
  493. // //println!("不能读取的socket");
  494. // continue;
  495. // }
  496. let msg = socket.read_message();
  497. //数据解析
  498. match msg {
  499. Ok(Message::Text(text)) => {
  500. let rsp_data = SocketTool::message_ok_unscramble(text);
  501. if rsp_data.code.to_string() == "0".to_string() {
  502. let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
  503. tokio::spawn(async move {
  504. let parse_fn = parse_fn.lock().await;
  505. parse_fn(rsp_data).await;
  506. });
  507. tokio::spawn(async move {});
  508. }
  509. if lable.to_string() == "okx".to_string() {
  510. if (Utc::now().timestamp() - okx_ping_time) > 2 {
  511. socket.write_message(Message::Ping(Vec::from("ping"))).expect("TODO: panic message");
  512. writeln!(stdout, "主动------:{:?}---ping", lable).unwrap();
  513. okx_ping_time = Utc::now().timestamp();
  514. okx_ping_off = true;
  515. }
  516. }
  517. }
  518. Ok(Message::Ping(s)) => {
  519. writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s)).unwrap();
  520. let mut op_str = "".to_string();
  521. if lable.to_string() == "binance".to_string() {
  522. op_str = "pong".to_string();
  523. socket.write_message(Message::Pong(Vec::from(op_str.clone()))).expect("TODO: panic message");
  524. writeln!(stdout, "ping----------:{:?}---{:?}", lable, op_str).unwrap();
  525. }
  526. }
  527. Ok(Message::Pong(s)) => {
  528. writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s)).unwrap();
  529. if lable.to_string() == "okx".to_string() {
  530. okx_ping_off = false;
  531. okx_ping_time = Utc::now().timestamp();
  532. }
  533. }
  534. Ok(Message::Close(_)) => {
  535. writeln!(stderr, "socket 关闭: ").unwrap();
  536. }
  537. Err(error) => {
  538. writeln!(stderr, "Error receiving message: {}", error).unwrap();
  539. let rsp_data = ResponseData::error("socket 发生错误!".to_string());
  540. tokio::spawn(async move {
  541. let parse_fn = parse_fn.lock().await;
  542. parse_fn(rsp_data).await;
  543. });
  544. break;
  545. }
  546. _ => {}
  547. }
  548. }
  549. socket.close(None).unwrap();
  550. } else {
  551. // 提示,并未找到好的优化方式,
  552. //println!("----socket-没代理");
  553. let (mut socket, response) =
  554. connect(request_url.clone())
  555. .expect("Can't connect(无法连接)");
  556. /******登陆认证********/
  557. if self.is_login {
  558. //println!("----需要登陆");
  559. let login_json_str = SocketTool::log_in_to_str(login_param);
  560. // //println!("---组装 登陆信息:{0}", login_json_str);
  561. socket.write_message(Message::Text(login_json_str)).unwrap();
  562. thread::sleep(Duration::from_secs(1));
  563. } else {
  564. //println!("----no longin(不需要登陆)");
  565. }
  566. /******订阅信息********/
  567. let sub_json = self.subscription.clone();
  568. //println!("--订阅内容:{:?}", sub_json);
  569. let sub_json_str = sub_json.to_string();
  570. writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
  571. socket.write_message(Message::Text(sub_json_str))
  572. .unwrap();
  573. /******数据读取********/
  574. let mut okx_ping_time = Utc::now().timestamp();
  575. let mut okx_ping_off = false;
  576. loop {
  577. if !socket.can_read() {
  578. continue;
  579. }
  580. let msg = socket.read_message();
  581. match msg {
  582. Ok(Message::Text(text)) => {
  583. let rsp_data = SocketTool::message_ok_unscramble(text);
  584. if rsp_data.code.to_string() == "0".to_string() {
  585. let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
  586. tokio::spawn(async move {
  587. let parse_fn = parse_fn.lock().await;
  588. parse_fn(rsp_data).await;
  589. });
  590. }
  591. // // writeln!(stdout, "---接收数据:{0}", text).unwrap();
  592. // //转json
  593. // let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
  594. // if json_value.get("result").is_some() {
  595. // writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
  596. // } else if json_value.get("event").is_some() {
  597. // writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
  598. // } else {
  599. // // --推送数据
  600. // // writeln!(stdout, "---推送数据:{0}", text).unwrap();
  601. // let rsp_data = ResponseData::new("0".to_string(),
  602. // "success".to_string(),
  603. // text);
  604. //
  605. // let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
  606. //
  607. // tokio::spawn(async move {
  608. // let parse_fn = parse_fn.lock().await;
  609. // parse_fn(rsp_data).await;
  610. // });
  611. // }
  612. if lable.to_string() == "okx".to_string() {
  613. if (Utc::now().timestamp() - okx_ping_time) > 2 {
  614. socket.write_message(Message::Ping(Vec::from("ping"))).expect("TODO: panic message");
  615. writeln!(stdout, "主动------:{:?}---ping", lable).unwrap();
  616. okx_ping_time = Utc::now().timestamp();
  617. okx_ping_off = true;
  618. }
  619. }
  620. }
  621. Ok(Message::Ping(s)) => {
  622. writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s)).unwrap();
  623. let mut op_str = "".to_string();
  624. if lable.to_string() == "binance".to_string() {
  625. op_str = "pong".to_string();
  626. socket.write_message(Message::Pong(Vec::from(op_str.clone()))).expect("TODO: panic message");
  627. writeln!(stdout, "ping----------:{:?}---{:?}", lable, op_str).unwrap();
  628. }
  629. }
  630. Ok(Message::Pong(s)) => {
  631. writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s)).unwrap();
  632. if lable.to_string() == "okx".to_string() {
  633. okx_ping_off = false;
  634. okx_ping_time = Utc::now().timestamp();
  635. }
  636. }
  637. Ok(Message::Close(_)) => {
  638. writeln!(stderr, "socket 关闭: ").unwrap();
  639. }
  640. Err(error) => {
  641. writeln!(stderr, "Error receiving message: {}", error).unwrap();
  642. let rsp_data = ResponseData::error("socket 发生错误!".to_string());
  643. tokio::spawn(async move {
  644. let parse_fn = parse_fn.lock().await;
  645. parse_fn(rsp_data).await;
  646. });
  647. break;
  648. }
  649. _ => {}
  650. }
  651. }
  652. socket.close(None).unwrap();
  653. }
  654. }
  655. fn log_in_to_str(login_param: BTreeMap<String, String>) -> String {
  656. let mut login_json_str = String::from("");
  657. //解析并且组装 认证信息
  658. let lable = login_param.get("lable");
  659. if let Some(ref_string) = lable {
  660. if *ref_string == "binance" {
  661. //println!("----币安 暂不做登陆");
  662. } else if *ref_string == "okx" {
  663. let mut access_key: String = "".to_string();
  664. let mut secret_key: String = "".to_string();
  665. let mut passphrase: String = "".to_string();
  666. for (key, value) in &login_param {
  667. // //println!("Key: {}, Value: {}", key, value);
  668. if key == "access_key" {
  669. access_key = value.parse().unwrap();
  670. } else if key == "secret_key" {
  671. secret_key = value.parse().unwrap();
  672. } else if key == "passphrase" {
  673. passphrase = value.parse().unwrap();
  674. }
  675. }
  676. let timestamp = Utc::now().timestamp().to_string();
  677. // 时间戳 + 请求类型+ 请求参数字符串
  678. let message = format!("{}GET{}", timestamp, "/users/self/verify");
  679. // //println!("---message:{:?}", message);
  680. let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
  681. let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
  682. let mut sign = base64::encode(result);
  683. let login_json = json!({
  684. "op": "login",
  685. "args": [{
  686. "apiKey": access_key,
  687. "passphrase": passphrase,
  688. "timestamp": timestamp,
  689. "sign": sign }]
  690. });
  691. // //println!("---login_json:{0}", login_json.to_string());
  692. // //println!("--登陆:{:?}", login_json);
  693. login_json_str = login_json.to_string();
  694. }
  695. } else {
  696. //println!("Option is None(lable 为None)");
  697. }
  698. login_json_str
  699. }
  700. //推送数据解析
  701. fn message_ok_unscramble(text: String) -> ResponseData {
  702. let mut rsp_data = ResponseData::new("-1".to_string(),
  703. "success".to_string(),
  704. text.parse().unwrap());
  705. let mut stdout = io::stdout();
  706. //转json
  707. let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
  708. if json_value.get("result").is_some() {
  709. writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
  710. rsp_data.code = "99".parse().unwrap()
  711. } else if json_value.get("event").is_some() {
  712. writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
  713. rsp_data.code = "98".parse().unwrap()
  714. } else {
  715. // --推送数据
  716. // writeln!(stdout, "---推送数据:{0}", text).unwrap();
  717. let rsp_json: serde_json::Value = serde_json::from_str(&*text).unwrap();
  718. if rsp_json.get("arg").is_some() {
  719. let channel = rsp_json["arg"]["channel"].as_str().unwrap_or("----");
  720. if channel != "account" {
  721. rsp_data.code = "0".parse().unwrap();
  722. }
  723. } else {
  724. rsp_data.code = "0".parse().unwrap();
  725. }
  726. }
  727. rsp_data
  728. }
  729. //币安--自定义-订阅
  730. pub fn binance_run_custom(b_array: Vec<&str>, parse_fn: impl Fn(ResponseData)) {}
  731. //币安--深度信息
  732. pub fn binance_run_kline<F, Fut>(b_array: Vec<&str>, parse_fn: F)
  733. where
  734. F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
  735. Fut: Future<Output=()> + Send + 'static,
  736. {
  737. SocketTool::binance_run(b_array, "kline_1s".to_string(), parse_fn);
  738. }
  739. //币安--深度信息
  740. pub fn binance_run_depth<F, Fut>(b_array: Vec<&str>, levels: String, parse_fn: F)
  741. where
  742. F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
  743. Fut: Future<Output=()> + Send + 'static,
  744. {
  745. let str = format!("depth{}@100ms", levels);
  746. SocketTool::binance_run(b_array, str.to_string(), parse_fn);
  747. }
  748. //币安--订阅
  749. pub fn binance_run<F, Fut>(b_array: Vec<&str>, subscription_name: String, parse_fn: F)
  750. where
  751. F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
  752. Fut: Future<Output=()> + Send + 'static,
  753. {
  754. let mut params = vec![];
  755. for item in &b_array {
  756. let mut b_name = item.to_lowercase();
  757. b_name = format!("{}@{}", b_name, subscription_name);
  758. params.push(b_name);
  759. }
  760. // 币安----socket
  761. let subscription = json!({
  762. "method": "SUBSCRIBE",
  763. "params":params,
  764. // [
  765. // "btcusdt@depth@100ms",//深度信息
  766. // "btcusdt@kline_1s"//k线信息
  767. // ],
  768. "id": 1
  769. });
  770. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  771. btree_map.insert("lable".parse().unwrap(), "binance".parse().unwrap());//产品Id
  772. let biance_socke = SocketTool::new("wss://stream.binance.com:443/ws",
  773. false,
  774. btree_map,
  775. subscription,
  776. );
  777. biance_socke.run(parse_fn);
  778. }
  779. //OKX-私有频道-订单信息
  780. pub fn okx_pr_run_orders<F, Fut>(b_array: Vec<&str>, access_key: String, secret_key: String, passphrase: String, parse_fn: F)
  781. where
  782. F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
  783. Fut: Future<Output=()> + Send + 'static,
  784. {
  785. let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
  786. btree_map.insert("lable".parse().unwrap(), "okx".parse().unwrap());//
  787. btree_map.insert("access_key".parse().unwrap(), access_key);//
  788. btree_map.insert("secret_key".parse().unwrap(), secret_key);//
  789. btree_map.insert("passphrase".parse().unwrap(), passphrase);//
  790. //组装推送信息
  791. let mut args = vec![];
  792. for item in &b_array {
  793. let mut b_name = item.replace("_", "-").to_uppercase();
  794. let mut map: HashMap<String, String> = HashMap::new();
  795. // 添加键值对
  796. map.insert("channel".to_string(), "orders".to_string());
  797. map.insert("instType".to_string(), "SPOT".to_string());
  798. map.insert("instId".to_string(), b_name.to_string());
  799. args.push(map);
  800. }
  801. let mut map_acc: HashMap<String, String> = HashMap::new();
  802. // 添加键值对
  803. map_acc.insert("channel".to_string(), "account".to_string());
  804. map_acc.insert("ccy".to_string(), "USDT".to_string());
  805. args.push(map_acc);
  806. SocketTool::okx_pr_run(b_array, args, btree_map, parse_fn);
  807. }
  808. //OKX-私有频道-订阅
  809. pub fn okx_pr_run<F, Fut>(b_array: Vec<&str>, args: Vec<HashMap<String, String>>, btree_map: BTreeMap<String, String>, parse_fn: F)
  810. where
  811. F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
  812. Fut: Future<Output=()> + Send + 'static,
  813. {
  814. let url = "wss://ws.okx.com:8443/ws/v5/private";
  815. let pu = "wss://ws.okx.com:8443/ws/v5/public";
  816. let subscription = json!({
  817. "op": "subscribe",
  818. "args":args
  819. // [
  820. // // { "channel": "account", "ccy": "USDT" }, //账户
  821. // { "channel": "orders", "instType": "SPOT", "instId":"BTC-USDT" }
  822. // ]
  823. });
  824. let okx_socke = SocketTool::new(url,
  825. true,
  826. btree_map,
  827. subscription);
  828. okx_socke.run(parse_fn);
  829. }
  830. }
  831. //http请求是否开启代理
  832. pub fn http_enable_proxy() -> bool {
  833. //拿到环境变量解析的数据
  834. let parsing_detail = parsing_environment_variables();
  835. if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
  836. let http_proxy = format!("http://{}:{}", parsing_detail.ip_address, parsing_detail.port);
  837. env::set_var("http_proxy", http_proxy.clone());
  838. env::set_var("https_proxy", http_proxy.clone());
  839. //println!("代理设置成功{0}", http_proxy.to_string());
  840. true
  841. } else {
  842. //println!("无法开启代理:环境变量获取失败:{:?}", parsing_detail);
  843. false
  844. }
  845. }
  846. #[derive(Debug)]
  847. pub struct ParsingDetail {
  848. ip_address: String,
  849. port: String,
  850. }
  851. impl ParsingDetail {
  852. fn new(ip_address: String,
  853. port: String, ) -> ParsingDetail {
  854. ParsingDetail { ip_address, port }
  855. }
  856. }
  857. //获取环境变量,并解析成ip与port
  858. pub fn parsing_environment_variables() -> ParsingDetail {
  859. let proxy_address = env::var("proxy_address");
  860. // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
  861. match proxy_address {
  862. Ok(value) => {
  863. //println!("环境变量读取成功:key:proxy_address , val:{}", value);
  864. env::set_var("http_proxy", value.to_string());
  865. env::set_var("https_proxy", value.to_string());
  866. let ip_port: Vec<&str> = value.split(":").collect();
  867. let parsing_detail = ParsingDetail::new(ip_port[0].to_string(), ip_port[1].to_string());
  868. parsing_detail
  869. }
  870. Err(_) => {
  871. //println!("环境变量读取失败:'proxy_address'");
  872. let parsing_detail = ParsingDetail::new("".to_string(), "".to_string());
  873. parsing_detail
  874. }
  875. }
  876. }
  877. //map数据转 get请求参数
  878. fn parse_params_to_str(parameters: BTreeMap<&str, &str>) -> String {
  879. parameters
  880. .into_iter()
  881. .map(|(key, value)| format!("{}={}", key, value))
  882. .collect::<Vec<String>>()
  883. .join("&")
  884. }
  885. //获取时时间
  886. fn get_timestamp() -> String {
  887. chrono::Utc::now()
  888. .format("%Y-%m-%dT%H:%M:%S%.3fZ")
  889. .to_string()
  890. }
  891. //统一返回
  892. #[derive(Debug)]
  893. #[derive(Clone)]
  894. pub struct ResponseData {
  895. pub code: String,
  896. pub message: String,
  897. pub data: String,
  898. }
  899. impl ResponseData {
  900. pub fn new(code: String, message: String, data: String) -> ResponseData {
  901. // original_string.replace("world", "Rust");
  902. ResponseData { code, message, data }
  903. }
  904. pub fn error(message: String) -> ResponseData {
  905. ResponseData { code: "-1".to_string(), message: "请求失败:".to_string() + &message, data: "".to_string() }
  906. }
  907. }