Bläddra i källkod

binance公共数据接入完成

skyfffire 1 år sedan
förälder
incheckning
8f8666ffb0

+ 4 - 4
standard/src/binance_swap.rs

@@ -145,7 +145,7 @@ impl Platform for BinanceSwap {
     // 获取仓位信息
     async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
         let symbol_format = utils::format_symbol(self.symbol.clone(), "");
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let res_data = self.request.get_position_risk(symbol_format).await;
         if res_data.code == 200 {
             let res_data_json = res_data.data.as_array().unwrap();
@@ -240,7 +240,7 @@ impl Platform for BinanceSwap {
                         max_qty: Decimal::from_str(lot_size_filter["maxQty"].as_str().unwrap()).unwrap(),
                         min_notional: Decimal::from_str(price_filter["minPrice"].as_str().unwrap()).unwrap(),
                         max_notional: Decimal::from_str(price_filter["maxPrice"].as_str().unwrap()).unwrap(),
-                        ct_val: Decimal::ONE,
+                        multiplier: Decimal::ONE,
                     };
                     Ok(result)
                 }
@@ -282,7 +282,7 @@ impl Platform for BinanceSwap {
                         max_qty: Decimal::from_str(lot_size_filter["maxQty"].as_str().unwrap()).unwrap(),
                         min_notional: Decimal::from_str(price_filter["minPrice"].as_str().unwrap()).unwrap(),
                         max_notional: Decimal::from_str(price_filter["maxPrice"].as_str().unwrap()).unwrap(),
-                        ct_val: Decimal::ONE,
+                        multiplier: Decimal::ONE,
                     };
                     Ok(result)
                 }
@@ -357,7 +357,7 @@ impl Platform for BinanceSwap {
     }
 
     async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
-        self.take_order_symbol(self.symbol.clone(), self.market.ct_val, custom_id, origin_side, price, amount).await
+        self.take_order_symbol(self.symbol.clone(), self.market.multiplier, custom_id, origin_side, price, amount).await
     }
 
     async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {

+ 9 - 9
standard/src/gate_swap.rs

@@ -128,7 +128,7 @@ impl Platform for GateSwap {
     // 获取持仓信息
     async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let res_data = self.request.get_position(symbol_array[1].to_string().to_lowercase(), self.symbol.clone()).await;
         if res_data.code == 200 {
             let res_data_json = res_data.data.as_array().unwrap();
@@ -248,7 +248,7 @@ impl Platform for GateSwap {
                         max_qty,
                         min_notional,
                         max_notional,
-                        ct_val,
+                        multiplier: ct_val,
                     };
                     Ok(result)
                 }
@@ -296,7 +296,7 @@ impl Platform for GateSwap {
                         max_qty,
                         min_notional,
                         max_notional,
-                        ct_val,
+                        multiplier: ct_val,
                     };
                     Ok(result)
                 }
@@ -309,7 +309,7 @@ impl Platform for GateSwap {
     // 获取订单详情
     async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let id = if order_id.eq("") { format!("t-{}", custom_id) } else { order_id.to_string() };
         let res_data = self.request.get_order_details(symbol_array[1].to_string().to_lowercase(), id).await;
         if res_data.code == 200 {
@@ -323,7 +323,7 @@ impl Platform for GateSwap {
     // 获取订单列表
     async fn get_orders_list(&mut self, status: &str) -> Result<Vec<Order>, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let res_data = self.request.get_orders(symbol_array[1].to_string().to_lowercase(), status.to_string()).await;
         if res_data.code == 200 {
             let res_data_json = res_data.data.as_array().unwrap();
@@ -337,7 +337,7 @@ impl Platform for GateSwap {
     // 下单接口
     async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let mut params = json!({
             "text": format!("t-{}", custom_id),
             "contract": self.symbol.to_string(),
@@ -420,7 +420,7 @@ impl Platform for GateSwap {
     // 撤销订单
     async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let settle = symbol_array[1].to_string().to_lowercase();
         let id = if order_id.eq("") { format!("t-{}", custom_id) } else { order_id.to_string() };
         let res_data = self.request.cancel_order(settle, id.to_string()).await;
@@ -435,7 +435,7 @@ impl Platform for GateSwap {
     // 批量撤销订单
     async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let res_data = self.request.cancel_orders(symbol_array[1].to_string().to_lowercase(), self.symbol.to_string()).await;
         if res_data.code == 200 {
             info!("{}", res_data.data.to_string());
@@ -449,7 +449,7 @@ impl Platform for GateSwap {
 
     async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
         let symbol_array: Vec<&str> = self.symbol.split("_").collect();
-        let ct_val = self.market.ct_val;
+        let ct_val = self.market.multiplier;
         let orders_res_data = self.request.get_orders(symbol_array[1].to_string().to_lowercase(), "open".to_string()).await;
         if orders_res_data.code == 200 {
             info!("{}", orders_res_data.data.to_string());

+ 2 - 2
standard/src/lib.rs

@@ -465,7 +465,7 @@ pub struct Market {
     pub max_qty: Decimal,
     pub min_notional: Decimal,
     pub max_notional: Decimal,
-    pub ct_val: Decimal,
+    pub multiplier: Decimal,
 }
 
 impl Market {
@@ -482,7 +482,7 @@ impl Market {
             max_qty: Default::default(),
             min_notional: Default::default(),
             max_notional: Default::default(),
-            ct_val: Default::default(),
+            multiplier: Default::default(),
         }
     }
 }

+ 32 - 41
strategy/src/binance_usdt_swap.rs

@@ -9,6 +9,9 @@ use exchanges::response_base::ResponseData;
 use global::trace_stack::{TraceStack};
 use crate::core::Core;
 use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::exchange_disguise::{on_depth, on_trade};
 
 // 参考 币安 合约 启动
 pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
@@ -21,71 +24,59 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
         //创建读写通道
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
         let mut ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::Public).await;
-        ws.set_symbols(symbols);
         ws.set_subscribe(vec![
-            // BinanceSwapSubscribeType::PuDepth20levels100ms,
             BinanceSwapSubscribeType::PuBookTicker,
-            // BinanceSwapSubscribeType::PuAggTrade
+            BinanceSwapSubscribeType::PuAggTrade
         ]);
 
         // 读取数据
-        let mut update_flag_u = Decimal::ZERO;
         let core_arc_clone = Arc::clone(&core_arc);
+        let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().multiplier;
+        let run_symbol = symbols.clone()[0].clone();
+
         let fun = move |data: ResponseData| {
             // 在 async 块之前克隆 Arc
             let core_arc_cc = core_arc_clone.clone();
+            let mul = multiplier.clone();
+            let rs = run_symbol.clone();
+
             async move {
                 // 使用克隆后的 Arc,避免 move 语义
-                on_data(core_arc_cc,
-                        &mut update_flag_u,
-                        data).await
+                on_public_data(core_arc_cc, &mul, &rs, &data).await
             }
         };
 
         // 链接
         let write_tx_am = Arc::new(Mutex::new(write_tx));
+        ws.set_symbols(symbols);
         ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败");
     });
 }
 
-async fn on_data(_core_arc_clone: Arc<Mutex<Core>>,
-                 _update_flag_u: &mut Decimal,
-                 response: ResponseData) {
+async fn on_public_data(core_arc: Arc<Mutex<Core>>, multiplier: &Decimal, _run_symbol: &String, response: &ResponseData) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
     match response.channel.as_str() {
-        // "aggTrade" => {
-        //     // let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-        //     // let name = data.label.clone();
-        //
-        //     // 订单流逻辑
-        //     // on_trade(trade.clone(), core_arc_clone.clone()).await;
-        //
-        //     // 原本的逻辑
-        //     // let mut core = core_arc_clone.lock().await;
-        //     // let str = data.label.clone();
-        //     // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
-        //     //     *_max_buy = Decimal::ZERO;
-        //     //     *_min_sell = Decimal::ZERO;
-        //     //     core.is_update.remove(str.as_str());
-        //     // }
-        //     // if trade.p > *_max_buy || *_max_buy == Decimal::ZERO {
-        //     //     *_max_buy = trade.p
-        //     // }
-        //     // if trade.p < *_min_sell || *_min_sell == Decimal::ZERO {
-        //     //     *_min_sell = trade.p
-        //     // }
-        //     // core.max_buy_min_sell_cache.insert(data.label, vec![*_max_buy, *_min_sell]);
-        // }
-        // "bookTicker" => {
-        //     trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
-        //     // 将ticker数据转换为模拟深度
-        //     let special_depth = standard::handle_info::HandleSwapInfo::handle_book_ticker(BinanceSwap, &response);
-        //     trace_stack.on_after_format();
-        //
-        //     on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
-        // }
+        "aggTrade" => {
+            trace_stack.set_source("binance_usdt_swap.aggTrade".to_string());
+            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::BinanceSwap, response, multiplier);
+            trace_stack.on_after_format();
+
+            for trade in trades.iter_mut() {
+                let core_arc_clone = core_arc.clone();
+
+                on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
+            }
+        }
+        "bookTicker" => {
+            trace_stack.set_source("binance_usdt_swap.bookTicker".to_string());
+            // 将ticker数据转换为模拟深度
+            let depth = ExchangeStructHandler::book_ticker_handle(ExchangeEnum::BinanceSwap, response, multiplier);
+            trace_stack.on_after_format();
+
+            on_depth(core_arc, &response.label, &mut trace_stack, &depth).await;
+        }
         // "depth" => {
         //     trace_stack.set_source("binance_usdt_swap.depth".to_string());
         //     // 将depth数据转换为模拟深度

+ 1 - 1
strategy/src/clear_core.rs

@@ -185,7 +185,7 @@ impl ClearCore {
                 max_qty: Default::default(),
                 min_notional: Default::default(),
                 max_notional: Default::default(),
-                ct_val: Default::default(),
+                multiplier: Default::default(),
                 amount_size: Default::default(),
             },
             platform_rest: match exchange.as_str() {

+ 1 - 1
strategy/src/core.rs

@@ -187,7 +187,7 @@ impl Core {
                 max_qty: Default::default(),
                 min_notional: Default::default(),
                 max_notional: Default::default(),
-                ct_val: Default::default(),
+                multiplier: Default::default(),
                 amount_size: Default::default(),
             },
             platform_rest: match exchange.as_str() {

+ 9 - 8
strategy/src/gate_usdt_swap.rs

@@ -66,12 +66,13 @@ pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 
         // 读取数据
         let core_arc_clone = Arc::clone(&core_arc);
+        // amount_size可能是要取合约乘数啊
         let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
         let run_symbol = symbols.clone()[0].clone();
 
         let fun = move |data: ResponseData| {
-            let core_arc_cc = core_arc_clone.clone();
             // 在 async 块之前克隆 Arc
+            let core_arc_cc = core_arc_clone.clone();
             let mul = multiplier.clone();
             let rs = run_symbol.clone();
 
@@ -79,7 +80,7 @@ pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
                 on_data(core_arc_cc,
                         &mul,
                         &rs,
-                        data,
+                        &data,
                 ).await
             }
         };
@@ -93,7 +94,7 @@ pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
 async fn on_data(core_arc: Arc<Mutex<Core>>,
                  multiplier: &Decimal,
                  run_symbol: &String,
-                 response: ResponseData) {
+                 response: &ResponseData) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
@@ -107,14 +108,14 @@ async fn on_data(core_arc: Arc<Mutex<Core>>,
         // }
         "futures.book_ticker" => {
             trace_stack.set_source("gate_usdt_swap.book_ticker".to_string());
-            let depth = ExchangeStructHandler::book_ticker_handle(ExchangeEnum::GateSwap, &response, multiplier);
+            let depth = ExchangeStructHandler::book_ticker_handle(ExchangeEnum::GateSwap, response, multiplier);
             trace_stack.on_after_format();
 
             on_depth(core_arc, &response.label, &mut trace_stack, &depth).await;
         }
         "futures.trades" => {
             trace_stack.set_source("gate_usdt_swap.trades".to_string());
-            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, &response, multiplier);
+            let mut trades = ExchangeStructHandler::trades_handle(ExchangeEnum::GateSwap, response, multiplier);
             trace_stack.on_after_format();
 
             for trade in trades.iter_mut() {
@@ -124,13 +125,13 @@ async fn on_data(core_arc: Arc<Mutex<Core>>,
             }
         }
         "futures.balances" => {
-            let account = ExchangeStructHandler::account_info_handle(ExchangeEnum::GateSwap, &response, &run_symbol);
+            let account = ExchangeStructHandler::account_info_handle(ExchangeEnum::GateSwap, response, &run_symbol);
             let mut core = core_arc.lock().await;
             core.update_equity(account).await;
         }
         "futures.orders" => {
             trace_stack.set_source("gate_swap.orders".to_string());
-            let orders = ExchangeStructHandler::order_handle(ExchangeEnum::GateSwap, &response, multiplier);
+            let orders = ExchangeStructHandler::order_handle(ExchangeEnum::GateSwap, response, multiplier);
 
             let mut order_infos:Vec<OrderInfo> = Vec::new();
             for mut order in orders.order {
@@ -150,7 +151,7 @@ async fn on_data(core_arc: Arc<Mutex<Core>>,
             }
         }
         "futures.positions" => {
-            let positions = ExchangeStructHandler::position_handle(ExchangeEnum::GateSwap, &response, multiplier);
+            let positions = ExchangeStructHandler::position_handle(ExchangeEnum::GateSwap, response, multiplier);
             let mut core = core_arc.lock().await;
             core.update_position(positions).await;
         }