Quellcode durchsuchen

推送消息添加更新ID

gepangpang vor 1 Jahr
Ursprung
Commit
83a50fa07b

+ 3 - 1
standard/src/binance_handle.rs

@@ -20,13 +20,15 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let ap = Decimal::from_str(data["a"].as_str().unwrap()).unwrap();
     let aq = Decimal::from_str(data["B"].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
+    let t = Decimal::from_str(data["u"].as_str().unwrap()).unwrap();
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
+        t,
     }
 }
 

+ 5 - 1
standard/src/bitget_spot_handle.rs

@@ -1,5 +1,7 @@
 use std::str::FromStr;
+use chrono::Utc;
 use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
 use serde_json::json;
 use tracing::trace;
@@ -120,12 +122,14 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let ap = Decimal::from_str(data["askPr"].as_str().unwrap()).unwrap();
     let aq = Decimal::from_str(data["askSz"].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
+    let t = Decimal::from_i64(Utc::now().timestamp_micros()).unwrap();
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
+        t,
     }
 }

+ 12 - 1
standard/src/handle_info.rs

@@ -1,4 +1,6 @@
 use std::cmp::Ordering;
+use std::str::FromStr;
+use chrono::Utc;
 use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
@@ -133,34 +135,42 @@ impl HandleSwapInfo {
         let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
         let mut depth_asks: Vec<MarketOrder>;
         let mut depth_bids: Vec<MarketOrder>;
+        let t: Decimal;
         match exchange {
             ExchangeEnum::BinanceSpot => {
                 depth_asks = binance_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = binance_handle::format_depth_items(res_data_json["bids"].clone());
+                t = Decimal::from_str(&res_data_json["u"].as_str().unwrap()).unwrap();
             }
             ExchangeEnum::BinanceSwap => {
                 depth_asks = binance_handle::format_depth_items(res_data_json["a"].clone());
                 depth_bids = binance_handle::format_depth_items(res_data_json["b"].clone());
+                t = Decimal::from_str(&res_data_json["u"].as_str().unwrap()).unwrap();
             }
             ExchangeEnum::GateSwap => {
                 depth_asks = gate_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = gate_handle::format_depth_items(res_data_json["bids"].clone());
+                t = Decimal::from_i64(Utc::now().timestamp_micros()).unwrap();
             }
             ExchangeEnum::KucoinSwap => {
                 depth_asks = kucoin_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = kucoin_handle::format_depth_items(res_data_json["bids"].clone());
+                t = Decimal::from_str(&res_data_json["sequence"].as_str().unwrap()).unwrap();
             }
             ExchangeEnum::KucoinSpot => {
                 depth_asks = kucoin_spot_handle::format_depth_items(res_data_json["asks"].clone());
                 depth_bids = kucoin_spot_handle::format_depth_items(res_data_json["bids"].clone());
+                t = Decimal::from_str(&res_data_json["sequence"].as_str().unwrap()).unwrap();
             }
             ExchangeEnum::OkxSwap => {
                 depth_asks = okx_handle::format_depth_items(res_data_json[0]["asks"].clone());
                 depth_bids = okx_handle::format_depth_items(res_data_json[0]["bids"].clone());
+                t = Decimal::from_i64(Utc::now().timestamp_micros()).unwrap();
             }
             ExchangeEnum::BitgetSpot => {
                 depth_asks = bitget_spot_handle::format_depth_items(res_data_json[0]["asks"].clone());
                 depth_bids = bitget_spot_handle::format_depth_items(res_data_json[0]["bids"].clone());
+                t = Decimal::from_i64(Utc::now().timestamp_micros()).unwrap();
             }
             _ => {
                 error!("未找到该交易所!handle_special_depth: {:?}",exchange);
@@ -219,12 +229,13 @@ impl HandleSwapInfo {
             }
         }
 
-        let ticker_info = SpecialTicker { sell: depth_asks[0].price, buy: depth_bids[0].price, mid_price: mp };
+        let ticker_info = SpecialTicker { sell: depth_asks[0].price, buy: depth_bids[0].price, mid_price: mp, t };
         let depth_info = bp.iter().cloned().chain(bv.iter().cloned()).chain(ap.iter().cloned()).chain(av.iter().cloned()).collect();
         SpecialDepth {
             name: res_data.label,
             depth: depth_info,
             ticker: ticker_info,
+            t,
         }
     }
 }

+ 3 - 1
standard/src/kucoin_handle.rs

@@ -45,13 +45,15 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let ap = Decimal::from_str(&data["bestAskPrice"].as_str().unwrap()).unwrap();
     let aq = Decimal::from_f64(data["bestAskSize"].as_f64().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
+    let t = Decimal::from_f64(data["sequence"].as_f64().unwrap()).unwrap();
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
+        t,
     }
 }
 

+ 3 - 1
standard/src/kucoin_spot_handle.rs

@@ -120,12 +120,14 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let ap = Decimal::from_str(data["bestAsk"].as_str().unwrap()).unwrap();
     let aq = Decimal::from_str(data["bestAskSize"].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
+    let t = Decimal::from_str(data["sequence"].as_str().unwrap()).unwrap();
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
+        t,
     }
 }

+ 5 - 0
standard/src/lib.rs

@@ -142,11 +142,13 @@ impl Depth {
 /// - `name<String>`: 平台信息;
 /// - `depth(Vec<Decimal>)`: 深度信息;
 /// - `ticker(SpecialTicker)`: 市场行情;
+/// - ``
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct SpecialDepth {
     pub name: String,
     pub depth: Vec<Decimal>,
     pub ticker: SpecialTicker,
+    pub t: Decimal,
 }
 
 impl SpecialDepth {
@@ -155,6 +157,7 @@ impl SpecialDepth {
             name: "".to_string(),
             depth: vec![],
             ticker: SpecialTicker::new(),
+            t: Default::default(),
         }
     }
 }
@@ -168,6 +171,7 @@ pub struct SpecialTicker {
     pub sell: Decimal,
     pub buy: Decimal,
     pub mid_price: Decimal,
+    pub t: Decimal,
 }
 
 impl SpecialTicker {
@@ -176,6 +180,7 @@ impl SpecialTicker {
             sell: Default::default(),
             buy: Default::default(),
             mid_price: Default::default(),
+            t: Default::default(),
         }
     }
 }

+ 5 - 1
standard/src/okx_handle.rs

@@ -1,5 +1,7 @@
 use std::str::FromStr;
+use chrono::Utc;
 use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
 use rust_decimal_macros::dec;
 use serde::{Deserialize, Serialize};
 use tracing::trace;
@@ -127,13 +129,15 @@ pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialD
     let ap = Decimal::from_str(asks[0].as_str().unwrap()).unwrap();
     let aq = Decimal::from_str(asks[1].as_str().unwrap()).unwrap();
     let mp = (bp + ap) * dec!(0.5);
+    let t = Decimal::from_i64(Utc::now().timestamp_micros()).unwrap();
 
-    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp };
+    let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t };
     let depth_info = vec![bp, bq, ap, aq];
     SpecialDepth {
         name: label,
         depth: depth_info,
         ticker: ticker_info,
+        t,
     }
 }
 

+ 81 - 78
standard/tests/exchange_test.rs

@@ -1,25 +1,28 @@
 use std::collections::{BTreeMap};
-use std::{env};
+// use std::{env};
 use std::io::{Error};
-use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use rust_decimal_macros::dec;
+// use std::sync::Arc;
+// use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
+// use rust_decimal_macros::dec;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
-use tokio::try_join;
-use tracing::{error, trace};
+// use tokio::try_join;
+// use tracing::{error, trace};
+use tracing::{error};
+use exchanges::kucoin_spot_ws::KucoinSpotSubscribeType;
 // use exchanges::bitget_spot_ws::{BitgetSpotWs, BitgetSubscribeType, BitgetWsType};
-use exchanges::kucoin_spot_ws::{KucoinSpotWs, KucoinSubscribeType, KucoinWsType};
+// use exchanges::kucoin_spot_ws::{KucoinSpotWs, KucoinSpotSubscribeType, KucoinSpotWsType};
 // use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
 // use exchanges::okx_swap_ws::{OkxSubscribeType, OkxSwapWs, OkxWsType};
 // use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
-use exchanges::response_base::ResponseData;
+// use exchanges::response_base::ResponseData;
 use standard::exchange::{Exchange, ExchangeEnum};
 // use standard::{binance_handle, Order, Platform, utils};
 // use standard::{okx_handle, Order, Platform, utils};
 // use standard::{kucoin_handle, Order, Platform, utils};
-use standard::{kucoin_spot_handle, Order, Platform, utils};
+// use standard::{kucoin_spot_handle, Order, Platform, utils};
 // use standard::{bitget_spot_handle, Order, Platform, utils};
+use standard::{Order, Platform, utils};
 
 // 创建实体
 #[allow(dead_code)]
@@ -106,7 +109,7 @@ pub async fn test_new_exchange(exchange: ExchangeEnum, symbol: &str) -> Box<dyn
 }
 
 #[allow(dead_code)]
-pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subscriber_type: T, mold: &str) where Vec<KucoinSubscribeType>: From<T> {
+pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, _symbol: &str, _subscriber_type: T, _mold: &str) where Vec<KucoinSpotSubscribeType>: From<T> {
     utils::proxy_handle();
 
     match exchange {
@@ -234,73 +237,73 @@ pub async fn test_new_exchange_wss<T>(exchange: ExchangeEnum, symbol: &str, subs
             // try_join!(t1, t2).unwrap();
         }
         ExchangeEnum::KucoinSpot => {
-            let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
-            let symbol_back = utils::format_symbol(symbol.to_string(), "_");
-            trace!(symbol_format);
-            let name = format!("kucoin_usdt_spot@{}", symbol.to_string().to_lowercase());
-            let bool_v1 = Arc::new(AtomicBool::new(true));
-
-            let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
-            let mut params: BTreeMap<String, String> = BTreeMap::new();
-            let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
-            let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
-            let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
-            params.insert("access_key".to_string(), access_key);
-            params.insert("secret_key".to_string(), secret_key);
-            params.insert("pass_key".to_string(), pass_key);
-
-            let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
-                KucoinSpotWs::new_label(name, false, params, KucoinWsType::Public, res_sender).await
-            } else {
-                KucoinSpotWs::new_label(name, false, params, KucoinWsType::Private, res_sender).await
-            };
-
-            exchange_wss.set_subscribe(subscriber_type.into());
-
-            let t1 = tokio::spawn(async move {
-                exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
-            });
-            let mold_arc = Arc::new(mold.to_string());
-            let t2 = tokio::spawn(async move {
-                let mold_clone = Arc::clone(&mold_arc);
-                loop {
-                    tokio::time::sleep(Duration::from_millis(1)).await;
-                    if let Ok(received) = res_receiver.try_recv() {
-                        trace!(?received);
-                        match mold_clone.as_str() {
-                            "depth" => {
-                                if received.data != "" {
-                                    let result = kucoin_spot_handle::handle_special_depth(received);
-                                    trace!(?result)
-                                }
-                            }
-                            "ticker" => {
-                                if received.data != "" {
-                                    let result = kucoin_spot_handle::handle_special_ticker(received);
-                                    trace!(?result)
-                                }
-                            }
-                            "account" => {
-                                if received.data != "" {
-                                    let result = kucoin_spot_handle::handle_account_info(received, symbol_back.clone());
-                                    trace!(?result)
-                                }
-                            }
-                            "orders" => {
-                                if received.data != "" {
-                                    let result = kucoin_spot_handle::handle_order(received, dec!(1));
-                                    trace!(?result)
-                                }
-                            }
-                            _ => {
-                                error!("没有该命令!mode={}", mold_clone);
-                                panic!("没有该命令!mode={}", mold_clone)
-                            }
-                        }
-                    }
-                }
-            });
-            try_join!(t1, t2).unwrap();
+            // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();
+            // let symbol_back = utils::format_symbol(symbol.to_string(), "_");
+            // trace!(symbol_format);
+            // let name = format!("kucoin_usdt_spot@{}", symbol.to_string().to_lowercase());
+            // let bool_v1 = Arc::new(AtomicBool::new(true));
+            //
+            // let (res_sender, mut res_receiver): (Sender<ResponseData>, Receiver<ResponseData>) = channel(1024);
+            // let mut params: BTreeMap<String, String> = BTreeMap::new();
+            // let access_key = env::var("kucoin_access_key").unwrap_or("".to_string());
+            // let secret_key = env::var("kucoin_secret_key").unwrap_or("".to_string());
+            // let pass_key = env::var("kucoin_pass_key").unwrap_or("".to_string());
+            // params.insert("access_key".to_string(), access_key);
+            // params.insert("secret_key".to_string(), secret_key);
+            // params.insert("pass_key".to_string(), pass_key);
+            //
+            // let mut exchange_wss = if ["depth", "ticker"].contains(&mold) {
+            //     KucoinSpotWs::new_label(name, false, params, KucoinSpotWsType::Public).await
+            // } else {
+            //     KucoinSpotWs::new_label(name, false, params, KucoinSpotWsType::Private).await
+            // };
+            //
+            // exchange_wss.set_subscribe(subscriber_type.into());
+            //
+            // let t1 = tokio::spawn(async move {
+            //     exchange_wss.custom_subscribe(bool_v1, vec![symbol_format]).await;
+            // });
+            // let mold_arc = Arc::new(mold.to_string());
+            // let t2 = tokio::spawn(async move {
+            //     let mold_clone = Arc::clone(&mold_arc);
+            //     loop {
+            //         tokio::time::sleep(Duration::from_millis(1)).await;
+            //         if let Ok(received) = res_receiver.try_recv() {
+            //             trace!(?received);
+            //             match mold_clone.as_str() {
+            //                 "depth" => {
+            //                     if received.data != "" {
+            //                         let result = kucoin_spot_handle::handle_special_depth(received);
+            //                         trace!(?result)
+            //                     }
+            //                 }
+            //                 "ticker" => {
+            //                     if received.data != "" {
+            //                         let result = kucoin_spot_handle::handle_special_ticker(received);
+            //                         trace!(?result)
+            //                     }
+            //                 }
+            //                 "account" => {
+            //                     if received.data != "" {
+            //                         let result = kucoin_spot_handle::handle_account_info(received, symbol_back.clone());
+            //                         trace!(?result)
+            //                     }
+            //                 }
+            //                 "orders" => {
+            //                     if received.data != "" {
+            //                         let result = kucoin_spot_handle::handle_order(received, dec!(1));
+            //                         trace!(?result)
+            //                     }
+            //                 }
+            //                 _ => {
+            //                     error!("没有该命令!mode={}", mold_clone);
+            //                     panic!("没有该命令!mode={}", mold_clone)
+            //                 }
+            //             }
+            //         }
+            //     }
+            // });
+            // try_join!(t1, t2).unwrap();
         }
         ExchangeEnum::BitgetSpot => {
             // let symbol_format = utils::format_symbol(symbol.to_string(), "-").to_uppercase();