Pārlūkot izejas kodu

工具集加入

skyfffire 3 nedēļas atpakaļ
vecāks
revīzija
3e8c503acd

+ 1 - 0
.gitignore

@@ -1,2 +1,3 @@
 /target
+/logs
 Cargo.lock

+ 4 - 1
.idea/extended.iml

@@ -1,7 +1,10 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module type="EMPTY_MODULE" version="4">
   <component name="NewModuleRootManager">
-    <content url="file://$MODULE_DIR$" />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
+      <excludeFolder url="file://$MODULE_DIR$/target" />
+    </content>
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
   </component>

+ 3 - 0
src/exchange/mod.rs

@@ -0,0 +1,3 @@
+mod proxy;
+mod socket_tool;
+mod response_base;

+ 132 - 0
src/exchange/proxy.rs

@@ -0,0 +1,132 @@
+use std::env;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+use tracing::trace;
+
+
+pub enum ProxyEnum {
+    REST,
+    WS,
+}
+
+pub enum ProxyResponseEnum {
+    NO,
+    YES(SocketAddr),
+}
+
+
+/**代理工具*/
+#[derive(Debug)]
+#[derive(Clone)]
+pub struct ParsingDetail {
+    // pub ip_address: String,
+    // pub port: String,
+}
+
+impl ParsingDetail {
+    pub fn env_proxy(proxy_enum: ProxyEnum) -> ProxyResponseEnum {
+        let proxy_address = env::var("proxy_address");
+        // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
+        let ip_port = match proxy_address {
+            Ok(value) => unsafe {
+                trace!("环境变量读取成功:key:proxy_address , val:{}", value);
+                env::set_var("http_proxy", value.to_string());
+                env::set_var("https_proxy", value.to_string());
+                value
+            }
+            Err(_) => {
+                trace!("环境变量读取失败:'proxy_address'");
+                "".to_string()
+            }
+        };
+
+        if ip_port.len() > 0 {
+            return match proxy_enum {
+                ProxyEnum::REST => unsafe {
+                    env::set_var("http_proxy", ip_port.to_string());
+                    env::set_var("https_proxy", ip_port.to_string());
+                    ProxyResponseEnum::NO
+                }
+                ProxyEnum::WS => {
+                    let ip_port: Vec<&str> = ip_port.split(":").collect();
+                    let ip_array: Vec<&str> = ip_port[0].split(".").collect();
+                    let proxy = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(
+                        ip_array[0].parse().unwrap(),
+                        ip_array[1].parse().unwrap(),
+                        ip_array[2].parse().unwrap(),
+                        ip_array[3].parse().unwrap())
+                    ), ip_port[1].parse().unwrap());
+                    ProxyResponseEnum::YES(proxy)
+                }
+            }
+        }
+
+        ProxyResponseEnum::NO
+    }
+
+    // fn new(ip_address: String, port: String) -> ParsingDetail {
+    //     ParsingDetail { ip_address, port }
+    // }
+    //
+    // //获取环境变量配置'proxy_address'拿到代理地址
+    // pub fn parsing_environment_variables(is_unusual: Option<&str>) -> ParsingDetail {
+    //     let proxy_address_name = match is_unusual {
+    //         None => {
+    //             "proxy_address"
+    //         }
+    //         Some(v) => {
+    //             match v {
+    //                 "binance" => {
+    //                     "binance_proxy_address"
+    //                 }
+    //                 _ => {
+    //                     "proxy_address"
+    //                 }
+    //             }
+    //         }
+    //     };
+    //     let proxy_address = env::var(proxy_address_name);
+    //     // 使用std::env::var函数获取环境变量的值,如果返回Err,则说明环境变量不存在
+    //     match proxy_address {
+    //         Ok(value) => {
+    //             trace!("环境变量读取成功:key:proxy_address , val:{}", value);
+    //             let ip_port: Vec<&str> = value.split(":").collect();
+    //             let parsing_detail = ParsingDetail::new(ip_port[0].to_string(), ip_port[1].to_string());
+    //             parsing_detail
+    //         }
+    //         Err(_) => {
+    //             trace!("环境变量读取失败:'proxy_address'");
+    //             let parsing_detail = ParsingDetail::new("".to_string(), "".to_string());
+    //             parsing_detail
+    //         }
+    //     }
+    // }
+    //
+    // //http请求是否开启代理:HTTP 只需要调用该方法即可
+    // //原理是 设置全局代理,所以程序如果要走代理只需要执行一次,后续的get,post..都会走代理
+    // pub fn http_enable_proxy(is_unusual: Option<&str>) -> bool {
+    //     //拿到环境变量解析的数据
+    //     let parsing_detail = Self::parsing_environment_variables(is_unusual);
+    //     if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
+    //         let http_proxy = format!("http://{}:{}", parsing_detail.ip_address, parsing_detail.port);
+    //         env::set_var("http_proxy", http_proxy.clone());
+    //         env::set_var("https_proxy", http_proxy.clone());
+    //         trace!("代理设置成功{0}", http_proxy.to_string());
+    //         true
+    //     } else {
+    //         trace!("无法开启代理:环境变量获取失败:{:?}", parsing_detail);
+    //         false
+    //     }
+    // }
+    //
+    // pub fn removes_proxy(is_unusual: Option<&str>) -> bool {
+    //     //拿到环境变量解析的数据
+    //     let parsing_detail = Self::parsing_environment_variables(is_unusual);
+    //     if parsing_detail.ip_address.len() > 0 && parsing_detail.port.len() > 0 {
+    //         env::remove_var("http_proxy");
+    //         env::remove_var("https_proxy");
+    //         true
+    //     } else {
+    //         false
+    //     }
+    // }
+}

+ 50 - 0
src/exchange/response_base.rs

@@ -0,0 +1,50 @@
+use serde_json::Value;
+use tokio::time::Instant;
+
+/**交易所返回数据处理之后,统一保存格式,为了内部其他接口调用*/
+#[derive(Debug, Clone)]
+pub struct Response {
+    pub label: String,
+    pub code: i16,
+    pub message: String,
+    pub channel: String,
+    pub data: Value,
+    pub ins: Instant,           // 数据接收的ins
+    pub time: i64,              // 数据接受的时间
+    pub reach_time: i64,        // 远程数据时间 弃用
+    pub data_type: String       // 數據類型, 例如 bybit 深度信息:snapshot(全量),delta(增量)
+}
+
+impl Response {
+    pub fn new(label: String, code: i16, message: String, data: Value) -> Response {
+        Response {
+            label,
+            code,
+            message,
+            data,
+            channel: "".to_string(),
+            time: 0,
+            reach_time: 0,
+            data_type: String::new(),
+            ins: Instant::now(),
+        }
+    }
+    pub fn error(label: String, message: String) -> Response {
+        Response {
+            label,
+            code: -1,
+            message: format!("{}", &message),
+            data: Value::Null,
+            channel: "".to_string(),
+            time: 0,
+            reach_time: 0,
+            data_type: String::new(),
+            ins: Instant::now(),
+        }
+    }
+
+    pub fn to_string(&self) -> String {
+        format!("{:?}", self)
+    }
+}
+

+ 281 - 0
src/exchange/socket_tool.rs

@@ -0,0 +1,281 @@
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+
+use chrono::Utc;
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use futures_util::{future, pin_mut, SinkExt, StreamExt};
+use serde_json::{Value};
+use tokio::net::TcpStream;
+use tokio::sync::Mutex;
+use tokio::time::Instant;
+use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{error, info, trace, warn};
+
+use crate::exchange::proxy;
+use crate::exchange::proxy::{ProxyEnum, ProxyResponseEnum};
+use crate::exchange::response_base::Response;
+
+#[derive(Debug)]
+pub enum HeartbeatType {
+    // Ping,
+    // Pong,
+    Custom(String),
+}
+
+pub struct AbstractWsMode {}
+
+impl AbstractWsMode {
+    pub async fn ws_connected<T, PI, PO, F, B, Future>(write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                       is_first_login: bool,
+                                                       label: String,
+                                                       is_shutdown_arc: Arc<AtomicBool>,
+                                                       handle_function: &F,
+                                                       subscribe_array: Vec<String>,
+                                                       ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
+                                                       message_text: T,
+                                                       message_ping: PI,
+                                                       message_pong: PO,
+                                                       message_binary: B)
+        where T: Fn(String) -> Option<Response> + Copy,
+              PI: Fn(Vec<u8>) -> Option<Response> + Copy,
+              PO: Fn(Vec<u8>) -> Option<Response> + Copy,
+              F: Fn(Response) -> Future + Clone,
+              B: Fn(Vec<u8>) -> Option<Response> + Copy,
+              Future: future::Future<Output=()> + Send + 'static,
+    {
+        let (ws_write, mut ws_read) = ws_stream.split();
+        let ws_write_arc = Arc::new(Mutex::new(ws_write));
+
+        // 将socket 的写操作与【写通道(外部向socket写)】链接起来,将数据以ok的结构体封装进行传递
+        // 这里是形成链式操作,如果要将外界的信息传进来(使用socket查单、下单之类的,部分交易所可以支持),就要这要弄
+        let mut write_to_socket_rx = write_to_socket_rx_arc.lock().await;
+        let ws_write_channel_clone = Arc::clone(&ws_write_arc);
+        let stdin_to_ws = async {
+            while let Some(message) = write_to_socket_rx.next().await {
+                let mut write_lock2 = ws_write_channel_clone.lock().await;
+                write_lock2.send(message).await?;
+            }
+            Ok::<(), Error>(())
+        };
+        // 如果不需要事先登录,则直接订阅消息
+        if !is_first_login {
+            trace!("不需要先登录,订阅内容:");
+            for s in &subscribe_array {
+                trace!("{}", s);
+                let mut write_lock = ws_write_arc.lock().await;
+                write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+            }
+        }
+
+        let ws_write_inner = Arc::clone(&ws_write_arc);
+        let ws_to_stdout = async {
+            while let Some(message) = ws_read.next().await {
+                if !is_shutdown_arc.load(Ordering::Relaxed) {
+                    continue;
+                }
+
+                let response_data = AbstractWsMode::analysis_message(message, message_text, message_ping, message_pong, message_binary);
+                // let response_data = func(message);
+                if response_data.is_some() {
+                    let mut data = response_data.unwrap();
+                    data.label = label.clone();
+
+                    let code = data.code.clone();
+
+                    if code == 200 {
+                        let mut data_c = data.clone();
+                        data_c.ins = Instant::now();
+                        data_c.time = Utc::now().timestamp_millis();
+
+                        handle_function(data_c).await;
+                    }
+
+                    /*
+                        200 -正确返回
+                       -200 -登录成功
+                       -201 -订阅成功
+                       -300 -客户端收到服务器心跳ping,需要响应
+                       -301 -客户端收到服务器心跳pong,需要响应
+                       -302 -客户端收到服务器心跳自定义,需要响应自定义
+                    */
+                    match code {
+                        200 => {
+                            let mut data_c = data.clone();
+                            data_c.ins = Instant::now();
+                            data_c.time = Utc::now().timestamp_millis();
+
+                            handle_function(data_c).await;
+                        }
+                        -200 => {
+                            // 登录成功
+                            info!("ws登录成功:{:?}", data);
+                            if is_first_login {
+                                for s in &subscribe_array {
+                                    info!("订阅内容:{}", s);
+                                    let mut write_lock = ws_write_arc.lock().await;
+                                    write_lock.send(Message::Text(s.parse().unwrap())).await.expect("订阅消息失败");
+                                }
+                                info!("订阅完成!");
+                            }
+                        }
+                        -201 => {
+                            // 订阅成功
+                            // trace!("订阅成功:{:?}", data);
+                        }
+                        -300 => {
+                            // 服务器发送心跳 ping 给客户端,客户端需要pong回应
+                            trace!("服务器响应-ping");
+                            if data.data != Value::Null {
+                                let mut ws_write = ws_write_inner.lock().await;
+                                ws_write.send(Message::Pong(Vec::from(data.data.to_string()))).await?;
+                                trace!("客户端回应服务器-pong");
+                            }
+                        }
+                        -301 => {
+                            // 服务器发送心跳 pong 给客户端,客户端需要ping回应
+                            trace!("服务器响应-pong");
+                            if data.data != Value::Null {
+                                let mut ws_write = ws_write_inner.lock().await;
+                                ws_write.send(Message::Ping(Vec::from(data.data.to_string()))).await?;
+                                trace!("客户端回应服务器-ping");
+                            }
+                        }
+                        -302 => {
+                            // 客户端收到服务器心跳自定义,需要响应自定义
+                            trace!("特定字符心跳,特殊响应:{:?}", data);
+                            let mut ws_write = ws_write_inner.lock().await;
+                            ws_write.send(Message::Text(data.data.to_string())).await?;
+                            trace!("特殊字符心跳-回应完成");
+                        }
+                        _ => {
+                            error!("未知:{:?}", data);
+                        }
+                    }
+                }
+            }
+            Ok::<(), Error>(())
+        };
+
+        //必须操作。,因为不同于其他的高级语言,有自动内存管理,所以为了防范地址改变,所以需要做此处理
+        pin_mut!(stdin_to_ws, ws_to_stdout,);
+        future::select(stdin_to_ws, ws_to_stdout).await;
+    }
+
+    //创建链接
+    pub async fn ws_connect_async<T, PI, PO, F, B, Future>(is_shutdown_arc: Arc<AtomicBool>,
+                                                           handle_function: F,
+                                                           address_url: String,
+                                                           is_first_login: bool,
+                                                           label: String,
+                                                           subscribe_array: Vec<String>,
+                                                           write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
+                                                           message_text: T,
+                                                           message_ping: PI,
+                                                           message_pong: PO,
+                                                           message_binary: B)
+        where T: Fn(String) -> Option<Response> + Copy,
+              PI: Fn(Vec<u8>) -> Option<Response> + Copy,
+              PO: Fn(Vec<u8>) -> Option<Response> + Copy,
+              B: Fn(Vec<u8>) -> Option<Response> + Copy,
+              F: Fn(Response) -> Future + Clone,
+              Future: future::Future<Output=()> + Send + 'static,
+    {
+        // 1.是否走代理
+        let proxy = match proxy::ParsingDetail::env_proxy(ProxyEnum::WS) {
+            ProxyResponseEnum::NO => {
+                // trace!("非 代理");
+                None
+            }
+            ProxyResponseEnum::YES(proxy) => {
+                // trace!("代理");
+                Option::from(proxy)
+            }
+        };
+
+        match connect_async(address_url.clone(), proxy).await {
+            Ok((ws_stream, _)) => {
+                trace!("socket 链接成功,{}。", address_url);
+
+                Self::ws_connected(write_to_socket_rx_arc,
+                                   is_first_login,
+                                   label,
+                                   is_shutdown_arc,
+                                   &handle_function,
+                                   subscribe_array.clone(),
+                                   ws_stream,
+                                   message_text,
+                                   message_ping,
+                                   message_pong,
+                                   message_binary).await;
+            }
+            Err(e) => {
+                warn!("WebSocket 握手失败:{:?}", e);
+            }
+        }
+    }
+
+    // 自动心跳包
+    pub async fn ping_pong(write_tx_clone: Arc<Mutex<UnboundedSender<Message>>>, h_type: HeartbeatType, millis: u64) {
+        loop {
+            tokio::time::sleep(Duration::from_millis(millis)).await;
+            let write_tx_clone = write_tx_clone.lock().await;
+            match write_tx_clone.unbounded_send(
+                match h_type {
+                    // HeartbeatType::Ping => {
+                    //     Message::Ping(Vec::from("Ping"))
+                    // }
+                    // HeartbeatType::Pong => {
+                    //     Message::Pong(Vec::from("Pong"))
+                    // }
+                    HeartbeatType::Custom(ref str) => {
+                        Message::Text(str.parse().unwrap())
+                    }
+                }
+            ) {
+                Ok(_o) => {
+                    trace!("发送指令-心跳:{:?}",h_type);
+                }
+                Err(k) => {
+                    error!("发送失败:原因{:?}",k)
+                }
+            }
+        }
+    }
+
+    // 数据解析
+    pub fn analysis_message<T, PI, PO, B>(message: Result<Message, Error>,
+                                          message_text: T,
+                                          message_ping: PI,
+                                          message_pong: PO,
+                                          message_binary: B) -> Option<Response>
+        where T: Fn(String) -> Option<Response>,
+              PI: Fn(Vec<u8>) -> Option<Response>,
+              PO: Fn(Vec<u8>) -> Option<Response>,
+              B: Fn(Vec<u8>) -> Option<Response>
+    {
+        match message {
+            Ok(Message::Text(text)) => message_text(text),
+            Ok(Message::Ping(pi)) => message_ping(pi),
+            Ok(Message::Pong(po)) => message_pong(po),
+            Ok(Message::Binary(s)) => message_binary(s), //二进制WebSocket消息
+            Ok(Message::Close(c)) => {
+                let message_str = format!("关闭指令:{:?}", c);
+                trace!("{:?}",message_str);
+                Option::from(Response::new("".to_string(), 0, message_str, Value::Null))
+            }
+            Ok(Message::Frame(f)) => {
+                //原始帧 正常读取数据不会读取到该 信息类型
+                let message_str = format!("意外读取到原始帧:{:?}", f);
+                trace!("{:?}",message_str);
+                Option::from(Response::new("".to_string(), -2, message_str, Value::Null))
+            }
+            Err(e) => {
+                let message_str = format!("服务器响应:{:?}", e);
+                trace!("{:?}",message_str);
+                Option::from(Response::new("".to_string(), -1, message_str, Value::Null))
+            }
+        }
+    }
+}

+ 10 - 2
src/main.rs

@@ -1,3 +1,11 @@
-fn main() {
-    println!("Hello, world!");
+mod utils;
+mod exchange;
+
+use utils::log_setup;
+
+#[tokio::main]
+async fn main() {
+    let _guards = log_setup::setup_logging().unwrap();
+
+    tracing::info!("Hello, world!");
 }

+ 20 - 20
src/utils/README.md

@@ -12,22 +12,22 @@ This is the main entry point of the library. It provides:
 
 - **Key Derivation and Signing**
 
-  - `grind_key`: Deterministically derives a StarkNet-compatible private key from a seed using SHA-256 and modular reduction.
-  - `get_private_key_from_eth_signature`: Extracts a private key from an Ethereum signature, using the `grind_key` function.
-  - `sign_message`: Signs a message using StarkNet ECDSA and returns a `StarkSignature` struct.
+    - `grind_key`: Deterministically derives a StarkNet-compatible private key from a seed using SHA-256 and modular reduction.
+    - `get_private_key_from_eth_signature`: Extracts a private key from an Ethereum signature, using the `grind_key` function.
+    - `sign_message`: Signs a message using StarkNet ECDSA and returns a `StarkSignature` struct.
 
 - **Message Hashing Functions**
 
-  - `get_order_hash`: Computes the hash for an order message, given all order parameters as strings.
-  - `get_transfer_hash`: Computes the hash for a transfer message, given all transfer parameters as strings.
-  - `get_withdrawal_hash`: Computes the hash for a withdrawal message, given all withdrawal parameters as strings.
+    - `get_order_hash`: Computes the hash for an order message, given all order parameters as strings.
+    - `get_transfer_hash`: Computes the hash for a transfer message, given all transfer parameters as strings.
+    - `get_withdrawal_hash`: Computes the hash for a withdrawal message, given all withdrawal parameters as strings.
 
 - **Types**
 
-  - `StarkSignature`: Holds the `r`, `s`, and `v` components of a StarkNet ECDSA signature.
+    - `StarkSignature`: Holds the `r`, `s`, and `v` components of a StarkNet ECDSA signature.
 
 - **Testing**
-  - Comprehensive unit tests for key derivation, signing, and message hash computation.
+    - Comprehensive unit tests for key derivation, signing, and message hash computation.
 
 ---
 
@@ -37,25 +37,25 @@ This module defines the core data structures and hashing logic for StarkNet off-
 
 - **Traits**
 
-  - `Hashable`: For types that can be hashed with a Poseidon hash and a selector.
-  - `OffChainMessage`: For types that represent off-chain messages and can be hashed with domain separation and a public key.
+    - `Hashable`: For types that can be hashed with a Poseidon hash and a selector.
+    - `OffChainMessage`: For types that represent off-chain messages and can be hashed with domain separation and a public key.
 
 - **Domain and Message Types**
 
-  - `StarknetDomain`: Represents the domain for message separation (name, version, chain_id, revision).
-  - `AssetId`, `PositionId`, `AssetAmount`, `Timestamp`: Basic types for message construction.
-  - `Order`, `TransferArgs`, `WithdrawalArgs`: Main message types for orders, transfers, and withdrawals.
+    - `StarknetDomain`: Represents the domain for message separation (name, version, chain_id, revision).
+    - `AssetId`, `PositionId`, `AssetAmount`, `Timestamp`: Basic types for message construction.
+    - `Order`, `TransferArgs`, `WithdrawalArgs`: Main message types for orders, transfers, and withdrawals.
 
 - **Hash Implementations**
 
-  - Each message type implements `Hashable` and provides a unique selector and hashing logic using the Poseidon hash function.
+    - Each message type implements `Hashable` and provides a unique selector and hashing logic using the Poseidon hash function.
 
 - **Constants**
 
-  - `SEPOLIA_DOMAIN`: A pre-defined domain for the Sepolia testnet.
+    - `SEPOLIA_DOMAIN`: A pre-defined domain for the Sepolia testnet.
 
 - **Testing**
-  - Unit tests for selectors, hashing, and message hash computation for all message types.
+    - Unit tests for selectors, hashing, and message hash computation for all message types.
 
 ---
 
@@ -77,10 +77,10 @@ use rust_crypto_lib_base::{get_order_hash, sign_message, get_private_key_from_et
 
 // Compute an order hash
 let hash = get_order_hash(
-    "1".to_string(), "0x2".to_string(), "100".to_string(), "0x1".to_string(),
-    "-156".to_string(), "0x1".to_string(), "74".to_string(), "100".to_string(),
-    "123".to_string(), "0x...".to_string(), "Perpetuals".to_string(), "v0".to_string(),
-    "SN_SEPOLIA".to_string(), "1".to_string()
+"1".to_string(), "0x2".to_string(), "100".to_string(), "0x1".to_string(),
+"-156".to_string(), "0x1".to_string(), "74".to_string(), "100".to_string(),
+"123".to_string(), "0x...".to_string(), "Perpetuals".to_string(), "v0".to_string(),
+"SN_SEPOLIA".to_string(), "1".to_string()
 )?;
 
 // Sign a message