| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018 |
- use std::collections::{BTreeMap, HashMap};
- use std::{env, io, thread};
- use std::error::Error;
- use std::future::Future;
- use std::io::{Write};
- use std::net::{IpAddr, Ipv4Addr, SocketAddr};
- use std::str::FromStr;
- use std::sync::Arc;
- use tokio::sync::Mutex;
- use std::time::Duration;
- use chrono::Utc;
- use reqwest;
- use reqwest::header::{HeaderMap, HeaderValue};
- use hex;
- use ring::hmac;
- use serde_json::{json, Map, Value};
- use tungstenite::client::{connect_with_proxy};
- use tungstenite::{connect, Message};
- use tungstenite::protocol::WebSocketConfig;
- use url::Url;
- #[cfg(test)]
- mod tests {
- use crate::exchange_libs::{BinanceExc, http_enable_proxy, OkxExc, ResponseData};
- #[tokio::test]//测试获取ok 账号信息
- async fn test_okx_get_acc() {
- //开启代理
- http_enable_proxy();
- let okx_exc = OkxExc::new(
- "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
- "556DAB6773CA26DDAAA114F7044138CA".to_string(),
- "rust_Test123".to_string(),
- );
- let req: ResponseData = okx_exc.okx_acc("USDT").await;
- print!("---响应:code:{}", req.code);
- print!("---响应:mes:{}", req.message);
- print!("---响应:data:{}", req.data);
- }
- #[tokio::test]//测试okx 下单
- async fn test_okx_order() {
- //开启代理
- http_enable_proxy();
- let okx_exc = OkxExc::new(
- "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
- "556DAB6773CA26DDAAA114F7044138CA".to_string(),
- "rust_Test123".to_string(),
- );
- let req: ResponseData = okx_exc.okx_order("CORE-USDT", "cash", "buy", "limit", "0.8555", "1").await;
- print!("---响应:code:{}", req.code);
- print!("---响应:mes:{}", req.message);
- print!("---响应:data:{}", req.data);
- }
- #[tokio::test]//测试okx获取订单信息
- async fn test_okx_get_order() {
- //开启代理
- http_enable_proxy();
- let okx_exc = OkxExc::new(
- "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
- "556DAB6773CA26DDAAA114F7044138CA".to_string(),
- "rust_Test123".to_string(),
- );
- let req2: ResponseData = okx_exc.okx_get_order("CORE-USDT", "611949361383612427").await;
- print!("---响应:code:{}", req2.code);
- print!("---响应:mes:{}", req2.message);
- print!("---响应:data:{}", req2.data);
- }
- #[tokio::test]//测试okx获取订单信息
- async fn test_okx_revocation_order() {
- //开启代理
- http_enable_proxy();
- let okx_exc = OkxExc::new(
- "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
- "556DAB6773CA26DDAAA114F7044138CA".to_string(),
- "rust_Test123".to_string(),
- );
- let req3: ResponseData = okx_exc.okx_revocation_order("CORE-USDT", "611950727669751811").await;
- print!("---响应:code:{}", req3.code);
- print!("---响应:mes:{}", req3.message);
- print!("---响应:data:{}", req3.data);
- }
- #[tokio::test]//测试币安-获取K
- async fn test_binance_k() {
- http_enable_proxy();
- let binance_exc = BinanceExc::new(
- "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
- "556DAB6773CA26DDAAA114F7044138CA".to_string(),
- );
- let req3: ResponseData = binance_exc.binance_k("BTCUSDT", "5m", "20").await;
- print!("---响应:code:{}", req3.code);
- print!("---响应:mes:{}", req3.message);
- print!("---响应:data:{}", req3.data);
- }
- #[tokio::test]//测试币安-深度信息
- async fn test_binance_depth() {
- http_enable_proxy();
- let binance_exc = BinanceExc::new(
- "a4cf4f54-f4d3-447d-a57c-166fd1ead2e0".to_string(),
- "556DAB6773CA26DDAAA114F7044138CA".to_string(),
- );
- let req3: ResponseData = binance_exc.binance_depth("BTCUSDT", "20").await;
- //println!("---响应:code:{}", req3.code);
- //println!("---响应:mes:{}", req3.message);
- //println!("---响应:data:{}", req3.data);
- }
- }
- pub struct BinanceExc {
- base_url: String,
- access_key: String,
- secret_key: String,
- }
- impl BinanceExc {
- pub fn new(access_key: String, secret_key: String) -> BinanceExc {
- BinanceExc { base_url: "https://api.binance.com".to_string(), access_key, secret_key }
- }
- //币安-深度信息
- pub async fn binance_depth(&self, symbol: &str, limit: &str) -> ResponseData {
- let base_url = "/api/v3/depth?symbol=".to_string() + &symbol + "&limit=" + &limit;
- let result = self.get(base_url.to_string()).await;
- match result {
- Ok(req_data) => {
- req_data
- }
- Err(err) => {
- let error = ResponseData::error(format!("json 解析失败:{}", err));
- error
- }
- }
- }
- //币安-k线
- pub async fn binance_k(&self, symbol: &str, interval: &str, limit: &str) -> ResponseData {
- let base_url = "/api/v3/klines?symbol=".to_string() + &symbol + "&interval=" + &interval + "&limit=" + &limit;
- let result = self.get(base_url.to_string()).await;
- match result {
- Ok(req_data) => {
- req_data
- }
- Err(err) => {
- let error = ResponseData::error(format!("json 解析失败:{}", err));
- error
- }
- }
- }
- //普通get
- async fn get(&self, request_path: String) -> Result<(ResponseData), reqwest::Error> {
- let mut req_data;
- let base_url = self.base_url.clone();
- // 发起GET请求
- let url = format!("{}{}", base_url, request_path);
- let response = reqwest::get(url)
- .await?; // 发起请求并等待结果
- //println!("-------");
- // 检查响应是否成功
- if response.status().is_success() {
- // 读取响应的内容
- let body = response.text().await?;
- //println!("Response body:\n{}", body);
- req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
- } else {
- let body = response.text().await?;
- //println!("Request failed with status: {}", body);
- req_data = ResponseData::error(body.to_string())
- }
- Ok((req_data))
- }
- }
- //okx 的结构体,以及相关方法
- pub struct OkxExc {
- base_url: String,
- access_key: String,
- secret_key: String,
- passphrase: String,
- }
- impl OkxExc {
- pub fn new(access_key: String, secret_key: String, passphrase: String) -> OkxExc {
- OkxExc {
- base_url: "https://www.okx.com".to_string(),
- access_key: access_key,
- secret_key: secret_key,
- passphrase: passphrase,
- }
- }
- //获取订单信息
- pub async fn okx_get_order(&self, inst_id: &str, ord_id: &str) -> ResponseData {
- let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
- btree_map.insert("instId", inst_id);//产品Id
- btree_map.insert("ordId", ord_id);//顶顶那
- let result = self.get_v(
- "/api/v5/trade/order".to_string(),
- btree_map,
- ).await;
- self.req_data_analysis(result)
- }
- //撤单接口
- pub async fn okx_revocation_order(&self, inst_id: &str, ord_id: &str) -> ResponseData {
- let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
- btree_map.insert("instId", inst_id);//产品Id
- btree_map.insert("ordId", ord_id);//顶顶那
- let result = self.post_v(
- "/api/v5/trade/cancel-order".to_string(),
- btree_map,
- ).await;
- self.req_data_analysis(result)
- }
- //下单接口
- pub async fn okx_order(&self, inst_id: &str, td_mode: &str, side: &str, ord_type: &str, px: &str, sz: &str) -> ResponseData {
- let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
- btree_map.insert("instId", inst_id);//产品Id
- btree_map.insert("tdMode", td_mode);//交易模式
- btree_map.insert("side", side);//订单方向
- btree_map.insert("ordType", ord_type);//订单类
- btree_map.insert("px", px);//委托价格
- btree_map.insert("sz", sz);//委托数量
- let result = self.post_v(
- "/api/v5/trade/order".to_string(),
- btree_map,
- ).await;
- self.req_data_analysis(result)
- }
- //账户信息
- pub async fn okx_acc(&self, ccy: &str) -> ResponseData {
- let mut btree_map: BTreeMap<&str, &str> = BTreeMap::new();
- btree_map.insert("ccy", ccy);
- let result = self.get_v(
- "/api/v5/account/balance".to_string(),
- btree_map,
- ).await;
- self.req_data_analysis(result)
- }
- //带认证-get
- pub(crate) async fn get_v(&self, request_path: String, params: BTreeMap<&str, &str>) -> Result<(ResponseData), reqwest::Error> {
- let mut req_data: ResponseData;
- /*请求接口与 地址*/
- let base_url = self.base_url.to_string();
- /*账号 密钥 密码*/
- let access_key = self.access_key.clone().to_string();
- let secret_key = self.secret_key.clone().to_string();
- let passphrase = self.passphrase.clone().to_string();
- /*签名生成*/
- let timestamp = get_timestamp();
- let params_str = parse_params_to_str(params);
- let get_url_params = format!("{}?{}", request_path, params_str); //接口与参数组装
- // 时间戳 + 请求类型+ 请求参数字符串
- let message = format!("{}GET{}", timestamp, get_url_params);
- // //println!("---message:{:?}", message);
- let sign = self.okx_sign(secret_key, message);
- //添加请求头
- let headers = self.okx_create_header(&access_key, &passphrase, &sign, ×tamp);
- let client = reqwest::Client::new();
- let req = client.get(base_url + &get_url_params)
- .headers(headers);
- //println!("--请求头:{:?}", req);
- //拿到返回
- let response = req.send()
- .await?;
- // 检查响应是否成功
- //println!("---状态:{:?},{}", response.status(), response.status().is_success());
- if response.status().is_success() {
- // 读取响应的内容
- let body = response.text().await?;
- //println!("okx_acc-Response body:\n{}", body);
- req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
- } else {
- let body = response.text().await?;
- //println!("okx_acc-Request failed with status: {}", body);
- req_data = ResponseData::error(body.to_string())
- }
- Ok((req_data))
- }
- //普通-get
- async fn get_v_n(&self, request_path: String) -> Result<(ResponseData), reqwest::Error> {
- let mut req_data;
- let base_url = self.base_url.clone();
- // 发起GET请求
- let url = format!("{}{}", base_url, request_path);
- let response = reqwest::get(url)
- .await?; // 发起请求并等待结果
- //println!("-------");
- // 检查响应是否成功
- if response.status().is_success() {
- // 读取响应的内容
- let body = response.text().await?;
- //println!("Response body:\n{}", body);
- req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
- } else {
- let body = response.text().await?;
- //println!("Request failed with status: {}", body);
- req_data = ResponseData::error(body.to_string())
- }
- Ok((req_data))
- }
- //带认证-post
- async fn post_v(&self, request_path: String, params: BTreeMap<&str, &str>) -> Result<(ResponseData), reqwest::Error> {
- let mut req_data: ResponseData;
- /*请求接口与 地址*/
- let base_url = self.base_url.to_string();
- /*账号 密钥 密码*/
- let access_key = self.access_key.to_string();
- let secret_key = self.secret_key.to_string();
- let passphrase = self.passphrase.to_string();
- /*签名生成*/
- let timestamp = get_timestamp();
- let params_str = serde_json::to_string(¶ms).unwrap();
- // let params_json = serde_json::to_value(params_str.clone()).unwrap();
- //println!("---params:{:?}", params);
- //println!("---params-json_str:{ }", params_str.clone());
- // //println!("---params-json:{:?}", params_json);
- // 时间戳 + 请求类型+ 请求参数字符串
- let message = format!("{}POST{}{}", timestamp, request_path, ¶ms_str);
- //println!("---message:{:?}", message);
- let sign = self.okx_sign(secret_key, message);
- //添加请求头
- let headers = self.okx_create_header(&access_key, &passphrase, &sign, ×tamp);
- let client = reqwest::Client::new();
- let url = format!("{}{}", base_url, request_path);
- let req = client
- .post(url)
- .headers(headers)
- .json(¶ms)
- ;
- let response = req.send()
- .await?;
- // 检查响应是否成功
- //println!("---状态:{:?},{}", response.status(), response.status().is_success());
- if response.status().is_success() {
- // 读取响应的内容
- let body = response.text().await?;
- //println!("okx_order-Response body:\n{}", body);
- req_data = ResponseData::new("0".to_string(), "success".to_string(), body);
- } else {
- let body = response.text().await?;
- //println!("okx_order-Request failed with status: {}", body);
- req_data = ResponseData::error(body.to_string())
- }
- Ok((req_data))
- }
- //okx 签名生成
- fn okx_sign(&self, secret_key: String, message: String) -> String {
- // 做签名
- let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
- let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
- base64::encode(result)
- }
- //生成hdear
- fn okx_create_header(&self, api_key: &str, passphrase: &str, sign: &str, timestamp: &str) -> HeaderMap {
- // 处理请求头 headers
- let mut header_map = HeaderMap::new();
- header_map.insert(
- "OK-ACCESS-KEY",
- HeaderValue::from_str(&api_key).unwrap(),
- );
- header_map.insert(
- "OK-ACCESS-SIGN",
- HeaderValue::from_str(&sign).unwrap());
- header_map.insert(
- "OK-ACCESS-TIMESTAMP",
- HeaderValue::from_str(×tamp).unwrap(),
- );
- header_map.insert(
- "OK-ACCESS-PASSPHRASE",
- HeaderValue::from_str(&passphrase).unwrap(),
- );
- header_map.insert(
- "CONTENT-TYPE",
- HeaderValue::from_static("application/json; charset=UTF-8"),
- );
- // header_map.insert(
- // reqwest::header::CONTENT_TYPE,
- // HeaderValue::from_static("application/json; charset=UTF-8"),
- // );
- header_map
- }
- //req_data 解析
- fn req_data_analysis(&self, result: Result<ResponseData, reqwest::Error>) -> ResponseData {
- match result {
- Ok(req_data) => {
- if req_data.code != "0" {
- req_data
- } else {
- let body: String = req_data.data;
- let json_value: serde_json::Value = serde_json::from_str(&body).unwrap();
- let code = json_value["code"].as_str().unwrap();
- let data = serde_json::to_string(&json_value["data"]).unwrap();
- let msg = json_value["msg"].as_str().unwrap();
- // //println!("--解析成功----code:{}",code);
- // //println!("--解析成功----data:{}",data);
- // //println!("--解析成功----msg:{}",msg);
- let success = ResponseData::new(code.parse().unwrap(),
- msg.parse().unwrap(),
- data.parse().unwrap());
- success
- }
- }
- Err(err) => {
- let error = ResponseData::error(format!("json 解析失败:{}", err));
- error
- }
- }
- }
- }
- pub struct SocketTool {
- //连接地址
- request_url: String,
- //ip
- ip: String,
- //ip
- port: u16,
- //是否需要登陆
- is_login: bool,
- //登陆所需参数
- login_param: BTreeMap<String, String>,
- //订阅参数
- subscription: serde_json::Value,
- }
- impl SocketTool {
- pub fn new(request_url: &str,
- is_login: bool,
- login_param: BTreeMap<String, String>,
- subscription: serde_json::Value, ) -> SocketTool
- {
- let mut ip_v = "";
- let mut port_v = 8080;
- let mut v_str = String::from("");
- /*******读取环境变量-判定初始化代理地址*******/
- let parsing_detail = parsing_environment_variables();
- if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
- ip_v = parsing_detail.ip_address.as_str();
- port_v = parsing_detail.port.parse().unwrap();
- }
- /*****返回结构体*******/
- SocketTool {
- request_url: request_url.to_string(),
- ip: ip_v.clone().to_string(),
- port: port_v,
- is_login: is_login,
- login_param: login_param,
- subscription: subscription,
- }
- }
- pub(crate) fn run<F, Fut>(&self, parse_fn: F)
- where
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
- Fut: Future<Output=()> + Send + 'static,
- {
- /*****消息溜***/
- let mut stdout = io::stdout();
- let mut stderr = io::stderr();
- /*****socket配置信息***/
- let request_url = Url::parse(self.request_url.as_str()).unwrap();
- let parse_fn = Arc::new(Mutex::new(parse_fn)); // Wrap the closure in an Arc<Mutex<_>>
- let login_param = self.login_param.clone();
- let lable = login_param.get("lable").unwrap().clone();
- /*****判断代理IP是否为空,空则不走代理*****/
- if self.ip.len() > 0 {
- //println!("----socket-走代理");
- let ip_array: Vec<&str> = self.ip.split(".").collect();
- let proxy_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
- ip_array[0].parse().unwrap(),
- ip_array[1].parse().unwrap(),
- ip_array[2].parse().unwrap(),
- ip_array[3].parse().unwrap())
- ), self.port);
- let websocket_config = Some(WebSocketConfig {
- max_send_queue: Some(16),
- max_message_size: Some(16 * 1024 * 1024),
- max_frame_size: Some(16 * 1024 * 1024),
- accept_unmasked_frames: false,
- });
- let max_redirects = 5;
- let (mut socket, response) =
- connect_with_proxy(request_url.clone(), proxy_address, websocket_config, max_redirects)
- .expect("Can't connect(无法连接)");
- /******登陆认证********/
- if self.is_login {
- //println!("----需要登陆");
- let login_json_str = SocketTool::log_in_to_str(login_param);
- // //println!("---组装 登陆信息:{0}", login_json_str);
- socket.write_message(Message::Text(login_json_str)).unwrap();
- thread::sleep(Duration::from_secs(1));
- } else {
- //println!("----no longin(不需要登陆)");
- }
- /******订阅信息********/
- let sub_json = self.subscription.clone();
- //println!("--订阅内容:{:?}", sub_json);
- let sub_json_str = sub_json.to_string();
- // writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
- socket.write_message(Message::Text(sub_json_str))
- .unwrap();
- /******数据读取********/
- let mut okx_ping_time = Utc::now().timestamp();
- let mut okx_ping_off = false;
- loop {
- // if !socket.can_read() {
- // //println!("不能读取的socket");
- // continue;
- // }
- let msg = socket.read_message();
- //数据解析
- match msg {
- Ok(Message::Text(text)) => {
- let rsp_data = SocketTool::message_ok_unscramble(text);
- if rsp_data.code.to_string() == "0".to_string() {
- let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
- tokio::spawn(async move {
- let parse_fn = parse_fn.lock().await;
- parse_fn(rsp_data).await;
- });
- tokio::spawn(async move {});
- }
- if lable.to_string() == "okx".to_string() {
- if (Utc::now().timestamp() - okx_ping_time) > 2 {
- socket.write_message(Message::Ping(Vec::from("ping"))).expect("TODO: panic message");
- writeln!(stdout, "主动------:{:?}---ping", lable).unwrap();
- okx_ping_time = Utc::now().timestamp();
- okx_ping_off = true;
- }
- }
- }
- Ok(Message::Ping(s)) => {
- writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s)).unwrap();
- let mut op_str = "".to_string();
- if lable.to_string() == "binance".to_string() {
- op_str = "pong".to_string();
- socket.write_message(Message::Pong(Vec::from(op_str.clone()))).expect("TODO: panic message");
- writeln!(stdout, "ping----------:{:?}---{:?}", lable, op_str).unwrap();
- }
- }
- Ok(Message::Pong(s)) => {
- writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s)).unwrap();
- if lable.to_string() == "okx".to_string() {
- okx_ping_off = false;
- okx_ping_time = Utc::now().timestamp();
- }
- }
- Ok(Message::Close(_)) => {
- writeln!(stderr, "socket 关闭: ").unwrap();
- }
- Err(error) => {
- writeln!(stderr, "Error receiving message: {}", error).unwrap();
- let rsp_data = ResponseData::error("socket 发生错误!".to_string());
- tokio::spawn(async move {
- let parse_fn = parse_fn.lock().await;
- parse_fn(rsp_data).await;
- });
- break;
- }
- _ => {}
- }
- }
- socket.close(None).unwrap();
- } else {
- // 提示,并未找到好的优化方式,
- //println!("----socket-没代理");
- let (mut socket, response) =
- connect(request_url.clone())
- .expect("Can't connect(无法连接)");
- /******登陆认证********/
- if self.is_login {
- //println!("----需要登陆");
- let login_json_str = SocketTool::log_in_to_str(login_param);
- // //println!("---组装 登陆信息:{0}", login_json_str);
- socket.write_message(Message::Text(login_json_str)).unwrap();
- thread::sleep(Duration::from_secs(1));
- } else {
- //println!("----no longin(不需要登陆)");
- }
- /******订阅信息********/
- let sub_json = self.subscription.clone();
- //println!("--订阅内容:{:?}", sub_json);
- let sub_json_str = sub_json.to_string();
- writeln!(stdout, "subscribe info: {:?}", sub_json_str).unwrap();
- socket.write_message(Message::Text(sub_json_str))
- .unwrap();
- /******数据读取********/
- let mut okx_ping_time = Utc::now().timestamp();
- let mut okx_ping_off = false;
- loop {
- if !socket.can_read() {
- continue;
- }
- let msg = socket.read_message();
- match msg {
- Ok(Message::Text(text)) => {
- let rsp_data = SocketTool::message_ok_unscramble(text);
- if rsp_data.code.to_string() == "0".to_string() {
- let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
- tokio::spawn(async move {
- let parse_fn = parse_fn.lock().await;
- parse_fn(rsp_data).await;
- });
- }
- // // writeln!(stdout, "---接收数据:{0}", text).unwrap();
- // //转json
- // let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
- // if json_value.get("result").is_some() {
- // writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
- // } else if json_value.get("event").is_some() {
- // writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
- // } else {
- // // --推送数据
- // // writeln!(stdout, "---推送数据:{0}", text).unwrap();
- // let rsp_data = ResponseData::new("0".to_string(),
- // "success".to_string(),
- // text);
- //
- // let parse_fn = Arc::clone(&parse_fn); // Clone the Arc for each iteration
- //
- // tokio::spawn(async move {
- // let parse_fn = parse_fn.lock().await;
- // parse_fn(rsp_data).await;
- // });
- // }
- if lable.to_string() == "okx".to_string() {
- if (Utc::now().timestamp() - okx_ping_time) > 2 {
- socket.write_message(Message::Ping(Vec::from("ping"))).expect("TODO: panic message");
- writeln!(stdout, "主动------:{:?}---ping", lable).unwrap();
- okx_ping_time = Utc::now().timestamp();
- okx_ping_off = true;
- }
- }
- }
- Ok(Message::Ping(s)) => {
- writeln!(stdout, "Ping-响应--{:?}", String::from_utf8(s)).unwrap();
- let mut op_str = "".to_string();
- if lable.to_string() == "binance".to_string() {
- op_str = "pong".to_string();
- socket.write_message(Message::Pong(Vec::from(op_str.clone()))).expect("TODO: panic message");
- writeln!(stdout, "ping----------:{:?}---{:?}", lable, op_str).unwrap();
- }
- }
- Ok(Message::Pong(s)) => {
- writeln!(stdout, "Pong-响应--{:?}", String::from_utf8(s)).unwrap();
- if lable.to_string() == "okx".to_string() {
- okx_ping_off = false;
- okx_ping_time = Utc::now().timestamp();
- }
- }
- Ok(Message::Close(_)) => {
- writeln!(stderr, "socket 关闭: ").unwrap();
- }
- Err(error) => {
- writeln!(stderr, "Error receiving message: {}", error).unwrap();
- let rsp_data = ResponseData::error("socket 发生错误!".to_string());
- tokio::spawn(async move {
- let parse_fn = parse_fn.lock().await;
- parse_fn(rsp_data).await;
- });
- break;
- }
- _ => {}
- }
- }
- socket.close(None).unwrap();
- }
- }
- fn log_in_to_str(login_param: BTreeMap<String, String>) -> String {
- let mut login_json_str = String::from("");
- //解析并且组装 认证信息
- let lable = login_param.get("lable");
- if let Some(ref_string) = lable {
- if *ref_string == "binance" {
- //println!("----币安 暂不做登陆");
- } else if *ref_string == "okx" {
- let mut access_key: String = "".to_string();
- let mut secret_key: String = "".to_string();
- let mut passphrase: String = "".to_string();
- for (key, value) in &login_param {
- // //println!("Key: {}, Value: {}", key, value);
- if key == "access_key" {
- access_key = value.parse().unwrap();
- } else if key == "secret_key" {
- secret_key = value.parse().unwrap();
- } else if key == "passphrase" {
- passphrase = value.parse().unwrap();
- }
- }
- let timestamp = Utc::now().timestamp().to_string();
- // 时间戳 + 请求类型+ 请求参数字符串
- let message = format!("{}GET{}", timestamp, "/users/self/verify");
- // //println!("---message:{:?}", message);
- let hmac_key = ring::hmac::Key::new(hmac::HMAC_SHA256, secret_key.as_bytes());
- let result = ring::hmac::sign(&hmac_key, &message.as_bytes());
- let mut sign = base64::encode(result);
- let login_json = json!({
- "op": "login",
- "args": [{
- "apiKey": access_key,
- "passphrase": passphrase,
- "timestamp": timestamp,
- "sign": sign }]
- });
- // //println!("---login_json:{0}", login_json.to_string());
- // //println!("--登陆:{:?}", login_json);
- login_json_str = login_json.to_string();
- }
- } else {
- //println!("Option is None(lable 为None)");
- }
- login_json_str
- }
- //推送数据解析
- fn message_ok_unscramble(text: String) -> ResponseData {
- let mut rsp_data = ResponseData::new("-1".to_string(),
- "success".to_string(),
- text.parse().unwrap());
- let mut stdout = io::stdout();
- //转json
- let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
- if json_value.get("result").is_some() {
- writeln!(stdout, "-币安--订阅成功:{0}", text).unwrap();
- rsp_data.code = "99".parse().unwrap()
- } else if json_value.get("event").is_some() {
- writeln!(stdout, "-OKX--订阅成功:{0}", text).unwrap();
- rsp_data.code = "98".parse().unwrap()
- } else {
- // --推送数据
- // writeln!(stdout, "---推送数据:{0}", text).unwrap();
- let rsp_json: serde_json::Value = serde_json::from_str(&*text).unwrap();
- if rsp_json.get("arg").is_some() {
- let channel = rsp_json["arg"]["channel"].as_str().unwrap_or("----");
- if channel != "account" {
- rsp_data.code = "0".parse().unwrap();
- }
- } else {
- rsp_data.code = "0".parse().unwrap();
- }
- }
- rsp_data
- }
- //币安--自定义-订阅
- pub fn binance_run_custom(b_array: Vec<&str>, parse_fn: impl Fn(ResponseData)) {}
- //币安--深度信息
- pub fn binance_run_kline<F, Fut>(b_array: Vec<&str>, parse_fn: F)
- where
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
- Fut: Future<Output=()> + Send + 'static,
- {
- SocketTool::binance_run(b_array, "kline_1s".to_string(), parse_fn);
- }
- //币安--深度信息
- pub fn binance_run_depth<F, Fut>(b_array: Vec<&str>, levels: String, parse_fn: F)
- where
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
- Fut: Future<Output=()> + Send + 'static,
- {
- let str = format!("depth{}@100ms", levels);
- SocketTool::binance_run(b_array, str.to_string(), parse_fn);
- }
- //币安--订阅
- pub fn binance_run<F, Fut>(b_array: Vec<&str>, subscription_name: String, parse_fn: F)
- where
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
- Fut: Future<Output=()> + Send + 'static,
- {
- let mut params = vec![];
- for item in &b_array {
- let mut b_name = item.to_lowercase();
- b_name = format!("{}@{}", b_name, subscription_name);
- params.push(b_name);
- }
- // 币安----socket
- let subscription = json!({
- "method": "SUBSCRIBE",
- "params":params,
- // [
- // "btcusdt@depth@100ms",//深度信息
- // "btcusdt@kline_1s"//k线信息
- // ],
- "id": 1
- });
- let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
- btree_map.insert("lable".parse().unwrap(), "binance".parse().unwrap());//产品Id
- let biance_socke = SocketTool::new("wss://stream.binance.com:443/ws",
- false,
- btree_map,
- subscription,
- );
- biance_socke.run(parse_fn);
- }
- //OKX-私有频道-订单信息
- pub fn okx_pr_run_orders<F, Fut>(b_array: Vec<&str>, access_key: String, secret_key: String, passphrase: String, parse_fn: F)
- where
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
- Fut: Future<Output=()> + Send + 'static,
- {
- let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
- btree_map.insert("lable".parse().unwrap(), "okx".parse().unwrap());//
- btree_map.insert("access_key".parse().unwrap(), access_key);//
- btree_map.insert("secret_key".parse().unwrap(), secret_key);//
- btree_map.insert("passphrase".parse().unwrap(), passphrase);//
- //组装推送信息
- let mut args = vec![];
- for item in &b_array {
- let mut b_name = item.replace("_", "-").to_uppercase();
- let mut map: HashMap<String, String> = HashMap::new();
- // 添加键值对
- map.insert("channel".to_string(), "orders".to_string());
- map.insert("instType".to_string(), "SPOT".to_string());
- map.insert("instId".to_string(), b_name.to_string());
- args.push(map);
- }
- let mut map_acc: HashMap<String, String> = HashMap::new();
- // 添加键值对
- map_acc.insert("channel".to_string(), "account".to_string());
- map_acc.insert("ccy".to_string(), "USDT".to_string());
- args.push(map_acc);
- SocketTool::okx_pr_run(b_array, args, btree_map, parse_fn);
- }
- //OKX-私有频道-订阅
- pub fn okx_pr_run<F, Fut>(b_array: Vec<&str>, args: Vec<HashMap<String, String>>, btree_map: BTreeMap<String, String>, parse_fn: F)
- where
- F: Fn(ResponseData) -> Fut + 'static + Send + Sync,
- Fut: Future<Output=()> + Send + 'static,
- {
- let url = "wss://ws.okx.com:8443/ws/v5/private";
- let pu = "wss://ws.okx.com:8443/ws/v5/public";
- let subscription = json!({
- "op": "subscribe",
- "args":args
- // [
- // // { "channel": "account", "ccy": "USDT" }, //账户
- // { "channel": "orders", "instType": "SPOT", "instId":"BTC-USDT" }
- // ]
- });
- let okx_socke = SocketTool::new(url,
- true,
- btree_map,
- subscription);
- okx_socke.run(parse_fn);
- }
- }
- //http请求是否开启代理
- pub fn http_enable_proxy() -> bool {
- //拿到环境变量解析的数据
- let parsing_detail = parsing_environment_variables();
- if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
- let http_proxy = format!("http://{}:{}", parsing_detail.ip_address, parsing_detail.port);
- env::set_var("http_proxy", http_proxy.clone());
- env::set_var("https_proxy", http_proxy.clone());
- //println!("代理设置成功{0}", http_proxy.to_string());
- true
- } else {
- //println!("无法开启代理:环境变量获取失败:{:?}", parsing_detail);
- false
- }
- }
- #[derive(Debug)]
- pub struct ParsingDetail {
- ip_address: String,
- port: String,
- }
- impl ParsingDetail {
- fn new(ip_address: String,
- port: String, ) -> ParsingDetail {
- ParsingDetail { ip_address, port }
- }
- }
- //获取环境变量,并解析成ip与port
- pub fn parsing_environment_variables() -> ParsingDetail {
- let proxy_address = env::var("proxy_address");
- // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
- match proxy_address {
- Ok(value) => {
- //println!("环境变量读取成功:key:proxy_address , val:{}", value);
- env::set_var("http_proxy", value.to_string());
- env::set_var("https_proxy", value.to_string());
- let ip_port: Vec<&str> = value.split(":").collect();
- let parsing_detail = ParsingDetail::new(ip_port[0].to_string(), ip_port[1].to_string());
- parsing_detail
- }
- Err(_) => {
- //println!("环境变量读取失败:'proxy_address'");
- let parsing_detail = ParsingDetail::new("".to_string(), "".to_string());
- parsing_detail
- }
- }
- }
- //map数据转 get请求参数
- fn parse_params_to_str(parameters: BTreeMap<&str, &str>) -> String {
- parameters
- .into_iter()
- .map(|(key, value)| format!("{}={}", key, value))
- .collect::<Vec<String>>()
- .join("&")
- }
- //获取时时间
- fn get_timestamp() -> String {
- chrono::Utc::now()
- .format("%Y-%m-%dT%H:%M:%S%.3fZ")
- .to_string()
- }
- //统一返回
- #[derive(Debug)]
- #[derive(Clone)]
- pub struct ResponseData {
- pub code: String,
- pub message: String,
- pub data: String,
- }
- impl ResponseData {
- pub fn new(code: String, message: String, data: String) -> ResponseData {
- // original_string.replace("world", "Rust");
- ResponseData { code, message, data }
- }
- pub fn error(message: String) -> ResponseData {
- ResponseData { code: "-1".to_string(), message: "请求失败:".to_string() + &message, data: "".to_string() }
- }
- }
|