JiahengHe 1 рік тому
батько
коміт
b6cfcf6b8f

+ 1 - 1
exchanges/src/binance_swap_rest.rs

@@ -272,7 +272,7 @@ impl BinanceSwapRest {
     }
 
     //生成 listenKey
-    pub async fn get_listenKey(&mut self) -> ResponseData {
+    pub async fn get_listen_key(&mut self) -> ResponseData {
         let  params = serde_json::json!({
          });
         let data = self.request("POST".to_string(),

+ 10 - 10
exchanges/src/binance_swap_ws.rs

@@ -130,13 +130,13 @@ impl BinanceSwapWs {
     /*****************************************工具函数********************************************************/
     /*******************************************************************************************************/
     //订阅枚举解析
-    pub fn enum_to_string_pr(symbol: String, subscribe_type: BinanceSwapSubscribeType, listenKey: String) -> String {
+    pub fn enum_to_string_pr(_symbol: String, subscribe_type: BinanceSwapSubscribeType, listen_key: String) -> String {
         match subscribe_type {
             BinanceSwapSubscribeType::PrBalance => {
-                format!("{}@balance", listenKey)
+                format!("{}@balance", listen_key)
             }
             BinanceSwapSubscribeType::PrAccount => {
-                format!("{}@account", listenKey)
+                format!("{}@account", listen_key)
             }
             _ => { "".to_string() }
         }
@@ -183,29 +183,29 @@ impl BinanceSwapWs {
         }
     }
     //获取 listenKey
-    pub async fn get_listenKey(&self) -> String {
-        let req = self.rest_swap.clone().unwrap().get_listenKey().await;
+    pub async fn get_listen_key(&self) -> String {
+        let req = self.rest_swap.clone().unwrap().get_listen_key().await;
         if req.code.as_str() != "200" {
             trace!("get_listenKey获取失败(私有订阅必要参数!)~!");
             "".to_string()
         } else {
             let body = req.data.clone();
             let json_value: serde_json::Value = serde_json::from_str(&body).unwrap();
-            let listenKey = json_value["listenKey"].as_str().unwrap().to_string();
-            listenKey
+            let listen_key = json_value["listenKey"].as_str().unwrap().to_string();
+            listen_key
         }
     }
     //私人订阅信息生成
     pub async fn get_subscription_pr(&self) -> String {
-        let listenKey = self.get_listenKey().await;
+        let listen_key = self.get_listen_key().await;
 
-        if listenKey.len() == 0 {
+        if listen_key.len() == 0 {
             return "".to_string();
         }
         let mut params = vec![];
         for symbol in &self.symbol_s {
             for subscribe_type in &self.subscribe_types {
-                let ty_str = Self::enum_to_string_pr(symbol.clone(), subscribe_type.clone(), listenKey.clone());
+                let ty_str = Self::enum_to_string_pr(symbol.clone(), subscribe_type.clone(), listen_key.clone());
                 params.push(ty_str);
             }
         }

+ 122 - 4
standard/src/binance_handle.rs

@@ -1,8 +1,11 @@
 use std::str::FromStr;
 use rust_decimal::Decimal;
 use rust_decimal_macros::dec;
+use serde_json::{Value, from_str, from_value};
+use tracing::{debug, error};
 use exchanges::response_base::ResponseData;
-use crate::{MarketOrder, SpecialDepth, SpecialTicker};
+use global::trace_stack::TraceStack;
+use crate::{Account, MarketOrder, Order, Position, PositionModeEnum, SpecialDepth, SpecialOrder, SpecialTicker};
 use crate::exchange::ExchangeEnum;
 use crate::handle_info::HandleSwapInfo;
 
@@ -10,11 +13,11 @@ use crate::handle_info::HandleSwapInfo;
 // 处理特殊Ticker信息
 pub fn handle_special_ticker(res_data: ResponseData) -> SpecialDepth {
     let res_data_str = res_data.data;
-    let res_data_json: serde_json::Value = serde_json::from_str(&*res_data_str).unwrap();
+    let res_data_json: Value = from_str(&*res_data_str).unwrap();
     format_special_ticker(res_data_json, res_data.label)
 }
 
-pub fn format_special_ticker(data: serde_json::Value, label: String) -> SpecialDepth {
+pub fn format_special_ticker(data: Value, label: String) -> SpecialDepth {
     let bp = Decimal::from_str(data["b"].as_str().unwrap()).unwrap();
     let bq = Decimal::from_str(data["B"].as_str().unwrap()).unwrap();
     let ap = Decimal::from_str(data["a"].as_str().unwrap()).unwrap();
@@ -40,7 +43,7 @@ pub fn handle_special_depth(res_data: ResponseData) -> SpecialDepth {
 }
 
 // 格式化深度信息
-pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
+pub fn format_depth_items(value: Value) -> Vec<MarketOrder> {
     let mut depth_items: Vec<MarketOrder> = vec![];
     for value in value.as_array().unwrap() {
         depth_items.push(MarketOrder {
@@ -49,4 +52,119 @@ pub fn format_depth_items(value: serde_json::Value) -> Vec<MarketOrder> {
         })
     }
     return depth_items;
+}
+
+pub fn handle_account_position_info(res_data: ResponseData, symbol: String, ct_val: Decimal) -> (Account, Vec<Position>) {
+    let res_data_str = res_data.data;
+    let res_data_json: Value = from_str(&res_data_str).unwrap();
+    let b: Vec<Value> = from_value(res_data_json["B"].clone()).unwrap();
+    let account = format_account_info(b, symbol);
+    let mut positions: Vec<Position> = Vec::new();
+    if res_data_json.get("P").is_some() {
+        let position_arr: Vec<Value> = from_value(res_data_json["P"].clone()).unwrap();
+        positions = position_arr.iter().map(|item| { format_position_item(item, ct_val) }).collect();
+    }
+    (account, positions)
+}
+
+pub fn format_account_info(data: Vec<Value>, symbol: String) -> Account {
+    if data.is_empty(){
+        error!("Binance:格式化usdt余额信息错误! 格式化数据为空");
+    }
+    let upper_str = symbol.to_uppercase();
+    let symbol_array: Vec<&str> = upper_str.split("_").collect();
+    let balance_info = data.iter().find(|&item| item["a"].as_str().unwrap() == symbol_array[1]);
+    match balance_info {
+        None => {
+            error!("Binance:格式化usdt余额信息错误!\nformat_account_info: data={:?}", balance_info);
+            panic!("Binance:格式化usdt余额信息错误!\nformat_account_info: data={:?}", balance_info)
+        }
+        Some(value) => {
+            let balance = Decimal::from_str(&value["wb"].as_str().unwrap().to_string()).unwrap();
+            Account {
+                coin: symbol_array[1].to_string(),
+                balance,
+                available_balance: Decimal::ZERO,
+                frozen_balance: Decimal::ZERO,
+                stocks: Decimal::ZERO,
+                available_stocks: Decimal::ZERO,
+                frozen_stocks: Decimal::ZERO,
+            }
+        }
+    }
+}
+
+pub fn format_position_item(position: &Value, ct_val: Decimal) -> Position {
+    let mut position_mode = match position["ps"].as_str().unwrap_or("") {
+        "BOTH" => PositionModeEnum::Both,
+        "LONG" => PositionModeEnum::Long,
+        "SHORT" => PositionModeEnum::Short,
+        _ => {
+            error!("Binance:格式化持仓模式错误!\nformat_position_item:position={:?}", position);
+            panic!("Binance:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
+        }
+    };
+    let symbol_mapper =  position["s"].as_str().unwrap().to_string();
+    let currency = "USDT";
+    let coin = &symbol_mapper[..symbol_mapper.find(currency).unwrap_or(0)];
+    let size = Decimal::from_str(&position["pa"].as_f64().unwrap().to_string()).unwrap();
+    let amount = size * ct_val;
+    match position_mode {
+        PositionModeEnum::Both => {
+            position_mode = match amount {
+                amount if amount > Decimal::ZERO => PositionModeEnum::Long,
+                amount if amount < Decimal::ZERO => PositionModeEnum::Short,
+                _ => { PositionModeEnum::Both }
+            }
+        }
+        _ => {}
+    }
+    Position {
+        symbol: format!{"{}_{}", coin, currency},
+        margin_level: Decimal::from_str(&position["leverage"].as_f64().unwrap().to_string()).unwrap(),
+        amount,
+        frozen_amount: Decimal::ZERO,
+        price: Decimal::from_str(&position["ep"].as_f64().unwrap().to_string()).unwrap(),
+        profit: Decimal::from_str(&position["up"].as_f64().unwrap().to_string()).unwrap(),
+        position_mode,
+        margin: Decimal::from_str(&position["iw"].as_f64().unwrap().to_string()).unwrap(),
+    }
+}
+
+
+// 处理order信息
+pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
+    let res_data_str = res_data.data;
+    let order: Value = from_str(&*res_data_str).unwrap();
+    debug!("format-order-start, binance_handle");
+    debug!(?order);
+    let status = order["X"].as_str().unwrap_or("");
+    let text = order["c"].as_str().unwrap_or("");
+    let size = Decimal::from_str(order["q"].as_str().unwrap()).unwrap();
+    let right = Decimal::from_str(order["z"].as_str().unwrap()).unwrap();
+    let price = Decimal::from_str(order["p"].as_str().unwrap()).unwrap();
+    let amount = size * ct_val;
+    let avg_price = Decimal::from_str(order["ap"].as_str().unwrap()).unwrap();
+    let deal_amount = right * ct_val;
+    let custom_status = if ["CANCELED", "FILLED", "EXPIRED"].contains(&status) { "REMOVE".to_string() } else if status == "NEW" { "NEW".to_string() } else {
+        "NULL".to_string()
+    };
+    let rst_order = Order {
+        id: format!("{}", order["i"].as_str().unwrap()),
+        custom_id: text.replace("t-my-custom-id_", "").replace("t-", ""),
+        price,
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: "limit".to_string(),
+        trace_stack: TraceStack::default().on_special("301 binance_handle".to_string()),
+    };
+
+    debug!(?rst_order);
+    debug!("format-order-end, binance_handle");
+    SpecialOrder {
+        name: res_data.label,
+        order: vec![rst_order],
+    }
 }

+ 1 - 4
standard/src/bybit_swap_handle.rs

@@ -23,7 +23,7 @@ pub fn format_account_info(data: Vec<serde_json::Value>, symbol: String) -> Acco
             panic!("Bybit:格式化统一账户信息错误!\nformat_account_info: data={:?}", data)
         }
         Some(val) =>{
-            let arr: Vec<Value> = serde_json::from_value(val["coin"].clone()).unwrap();
+            let arr: Vec<Value> = from_value(val["coin"].clone()).unwrap();
             let upper_str = symbol.to_uppercase();
             let symbol_array: Vec<&str> = upper_str.split("_").collect();
             let balance_info = arr.iter().find(|&item| item["coin"].as_str().unwrap() == symbol_array[1]);
@@ -47,9 +47,6 @@ pub fn format_account_info(data: Vec<serde_json::Value>, symbol: String) -> Acco
             }
         }
     }
-
-
-
 }
 
 // 处理position信息

+ 1 - 2
standard/src/handle_info.rs

@@ -123,8 +123,7 @@ impl HandleSwapInfo {
     pub fn handle_order(exchange: ExchangeEnum, res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
         match exchange {
             ExchangeEnum::BinanceSwap => {
-                error!("暂未提供此交易所方法!handle_order:{:?}", exchange);
-                panic!("暂未提供此交易所方法!handle_order:{:?}", exchange);
+               binance_handle::handle_order(res_data, ct_val)
             }
             ExchangeEnum::GateSwap => {
                 gate_handle::handle_order(res_data, ct_val)

+ 85 - 15
strategy/src/binance_usdt_swap.rs

@@ -5,44 +5,63 @@ use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::BinanceSwap;
-use crate::model::{OriginalTradeBa};
+use standard::exchange::ExchangeEnum::{BinanceSwap};
+use crate::model::{OrderInfo, OriginalTradeBa};
 use crate::quant::Quant;
-use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
+use exchanges::binance_swap_ws::{BinanceSwapLogin, BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
 use futures_util::StreamExt;
+use standard::binance_handle::handle_account_position_info;
+use standard::Position;
 use crate::exchange_disguise::{on_special_depth};
 
 // 参考 币安 合约 启动
-pub(crate) async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>,
+pub(crate) async fn binance_swap_run(bool_v1 :Arc<AtomicBool>,
                                                quant_arc: Arc<Mutex<Quant>>,
                                                name: String,
                                                symbols: Vec<String>,
                                                is_colo: bool,
-                                               _exchange_params: BTreeMap<String, String>) {
+                                               exchange_params: BTreeMap<String, String>,
+                                               is_trade: bool,) {
     tokio::spawn(async move {
+        let run_symbol = symbols.clone()[0].clone();
         //创建读写通道
         let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
         let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-        let mut ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::PublicAndPrivate);
+        let mut ws;
+
+        if is_trade {
+            let login_param = parse_btree_map_to_binance_swap_login(exchange_params);
+            ws = BinanceSwapWs::new_label(name, is_colo, Some(login_param),
+                                          BinanceSwapWsType::PublicAndPrivate);
+            ws.set_subscribe(vec![
+                BinanceSwapSubscribeType::PuBookTicker,
+                BinanceSwapSubscribeType::PuMarkPrice,
+                BinanceSwapSubscribeType::PrBalance,
+                BinanceSwapSubscribeType::PrAccount,
+            ]);
+        } else {
+            ws = BinanceSwapWs::new_label(name, is_colo, None, BinanceSwapWsType::PublicAndPrivate);
+            ws.set_subscribe(vec![
+                // BinanceSwapSubscribeType::PuDepth20levels100ms,
+                BinanceSwapSubscribeType::PuBookTicker,
+                // BinanceSwapSubscribeType::PuAggTrade
+            ]);
+        }
         ws.set_symbols(symbols);
-        ws.set_subscribe(vec![
-            // BinanceSwapSubscribeType::PuDepth20levels100ms,
-            BinanceSwapSubscribeType::PuBookTicker,
-            // BinanceSwapSubscribeType::PuAggTrade
-        ]);
 
         //读取数据
-        let bot_arc_clone = Arc::clone(&quant_arc);
         tokio::spawn(async move {
+            let bot_arc_clone = Arc::clone(&quant_arc);
             // ticker
             let mut update_flag_u = Decimal::ZERO;
             // trade
             let mut max_buy = Decimal::ZERO;
             let mut min_sell = Decimal::ZERO;
-
+            let symbol_ = run_symbol;
+            let ct_val = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
             loop {
                 if let Some(data) = read_rx.next().await {
-                    on_data(bot_arc_clone.clone(), &mut update_flag_u, &mut max_buy, &mut min_sell, data).await;
+                    on_data(bot_arc_clone.clone(), &mut update_flag_u, &mut max_buy, &mut min_sell, data, symbol_.clone(), ct_val).await;
                 }
             }
         });
@@ -57,7 +76,9 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
                  update_flag_u: &mut Decimal,
                  _max_buy: &mut Decimal,
                  _min_sell: &mut Decimal,
-                 data: ResponseData) {
+                 data: ResponseData,
+                 run_symbol: String,
+                 ct_val: Decimal) {
     let mut trace_stack = TraceStack::default();
     trace_stack.on_after_network(chrono::Utc::now().timestamp_micros());
     trace_stack.on_before_quant();
@@ -104,5 +125,54 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
         trace_stack.on_after_format();
 
         on_special_depth(bot_arc_clone, update_flag_u, data.label.clone(), trace_stack, special_depth).await;
+    } else if data.channel == "ACCOUNT_UPDATE" {
+        let result =  handle_account_position_info(data, run_symbol, ct_val);
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_equity(result.0);
+            let positions: Vec<Position> = result.1;
+            if positions.len() > 0 {
+                quant.update_position(positions);
+            }
+        }
+    } else if data.channel == "ORDER_TRADE_UPDATE" {
+        trace_stack.on_before_format();
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(BinanceSwap, data.clone(), ct_val.clone());
+        trace_stack.on_after_format();
+
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for order in orders.order {
+            if order.status == "NULL" {
+                continue;
+            }
+            let order_info = OrderInfo {
+                symbol: "".to_string(),
+                amount: order.amount.abs(),
+                side: "".to_string(),
+                price: order.price,
+                client_id: order.custom_id,
+                filled_price: order.avg_price,
+                filled: order.deal_amount.abs(),
+                order_id: order.id,
+                local_time: 0,
+                create_time: 0,
+                status: order.status,
+                fee: Default::default(),
+                trace_stack: Default::default(),
+            };
+            order_infos.push(order_info);
+        }
+
+        {
+            let mut quant = bot_arc_clone.lock().await;
+            quant.update_order(order_infos, trace_stack);
+        }
+    }
+}
+
+fn parse_btree_map_to_binance_swap_login(exchange_params: BTreeMap<String, String>) -> BinanceSwapLogin {
+    BinanceSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        api_secret: exchange_params.get("secret_key").unwrap().clone(),
     }
 }

+ 5 - 2
strategy/src/exchange_disguise.rs

@@ -6,7 +6,7 @@ use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
 use crate::binance_spot::reference_binance_spot_run;
-use crate::binance_usdt_swap::reference_binance_swap_run;
+use crate::binance_usdt_swap::binance_swap_run;
 use crate::bitget_spot::bitget_spot_run;
 use crate::bybit_usdt_swap::bybit_swap_run;
 use crate::gate_swap::gate_swap_run;
@@ -24,6 +24,9 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>,
                                         is_colo: bool,
                                         exchange_params: BTreeMap<String, String>) {
     match exchange_name.as_str() {
+        "binance_usdt_swap" => {
+            binance_swap_run(bool_v1, quant_arc, name, symbols, is_colo, exchange_params, true).await;
+        }
         "gate_usdt_swap" => {
             gate_swap_run(bool_v1, true, quant_arc, name, symbols, is_colo, exchange_params).await;
         }
@@ -56,7 +59,7 @@ pub async fn run_reference_exchange(bool_v1: Arc<AtomicBool>,
                                     exchange_params: BTreeMap<String, String>) {
     match exchange_name.as_str() {
         "binance_usdt_swap" => {
-            reference_binance_swap_run(bool_v1, quant_arc, name, symbols, is_colo, exchange_params).await;
+            binance_swap_run(bool_v1, quant_arc, name, symbols, is_colo, exchange_params, false).await;
         },
         "binance_spot" => {
             reference_binance_spot_run(bool_v1, quant_arc, name, symbols, is_colo, exchange_params).await;