浏览代码

phemex接入

JiahengHe 1 年之前
父节点
当前提交
953b7cbe2a

+ 12 - 38
exchanges/src/phemex_swap_ws.rs

@@ -41,11 +41,8 @@ pub enum PhemexSwapSubscribeType {
     // K线数据
     PuFuturesRecords,
 
-    PrFuturesOrders,
-    PrFuturesPositions,
-    PrFuturesBalances,
+    // 账户、订单、仓位信息3合一
     PrFuturesAccountOrderPosition,
-
 }
 
 //账号信息
@@ -128,7 +125,7 @@ impl PhemexSwapWs {
     pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
         for symbol in b_array.iter_mut() {
             // 大写
-            *symbol = symbol.to_string();
+            *symbol = symbol.to_uppercase().replace("1000", "u1000").to_string();
             // 字符串替换
             *symbol = symbol.replace("_", "");
         }
@@ -141,9 +138,6 @@ impl PhemexSwapWs {
                 PhemexSwapSubscribeType::PuFuturesRecords => false,
                 PhemexSwapSubscribeType::PuFuturesOrderBook => false,
 
-                PhemexSwapSubscribeType::PrFuturesOrders => true,
-                PhemexSwapSubscribeType::PrFuturesPositions => true,
-                PhemexSwapSubscribeType::PrFuturesBalances => true,
                 PhemexSwapSubscribeType::PrFuturesAccountOrderPosition => true,
 
             } {
@@ -189,33 +183,6 @@ impl PhemexSwapWs {
                 })
             }
 
-            PhemexSwapSubscribeType::PrFuturesOrders => {
-                json!({
-                    "id": 999,
-                    "method": "trade_p.subscribe",
-                    "params": [
-                        symbol
-                    ]
-                })
-            }
-            PhemexSwapSubscribeType::PrFuturesPositions => {
-                json!({
-                    "id": 999,
-                    "method": "trade_p.subscribe",
-                    "params": [
-                        symbol
-                    ]
-                })
-            }
-            PhemexSwapSubscribeType::PrFuturesBalances => {
-                json!({
-                    "id": 999,
-                    "method": "trade_p.subscribe",
-                    "params": [
-                        symbol
-                    ]
-                })
-            }
             PhemexSwapSubscribeType::PrFuturesAccountOrderPosition => {
                 json!({
                     "id": 999,
@@ -440,7 +407,7 @@ impl PhemexSwapWs {
                             res_data.code = 200;
                             res_data.data = json_value.clone();
                             res_data.channel = "futures.order_book".to_string();
-
+                            res_data.data_type = "incremental".to_string();
                         } else if !json_value["accounts_p"].is_null() && !json_value["orders_p"].is_null()  && !json_value["positions_p"].is_null(){
                             res_data.code = 200;
                             res_data.data = json_value.clone();
@@ -452,8 +419,15 @@ impl PhemexSwapWs {
                         return res_data;
                     }
                     "snapshot" => {
-                        res_data.code = 400;
-                        res_data.message = "(快照)未知解析".to_string();
+                        if !json_value["orderbook_p"].is_null() {
+                            res_data.code = 200;
+                            res_data.data = json_value.clone();
+                            res_data.channel = "futures.order_book".to_string();
+                            res_data.data_type = "snapshot".to_string();
+                        } else {
+                            res_data.code = 400;
+                            res_data.message = "(快照)未知解析".to_string();
+                        }
                         return res_data;
                     }
                     _ => {}

+ 1 - 1
global/src/log_utils.rs

@@ -61,7 +61,7 @@ pub fn send_remote_err_log(msg: String) {
                 // let body = _resp.text().await.unwrap();
             }
             Err(err) => {
-                warn!("log的error监听器发送远端报错失败:{:?}", err);
+                // warn!("log的error监听器发送远端报错失败:{:?}", err);
             }
         }
     });

+ 21 - 6
standard/src/phemex_swap.rs

@@ -208,8 +208,8 @@ impl Platform for PhemexSwap {
         let symbol_format = utils::format_symbol(symbol.clone(), "").replace("1000", "u1000");
 
         let params = json!({});
-        // let response = self.request.get_market(params).await;
-        let response = get_market_info(params);
+        let response = self.request.get_market(params).await;
+        // let response = get_market_info(params);
         if response.code != 200 {
             return Err(Error::new(ErrorKind::NotFound, format!("phemex_swap 获取市场行情异常{:?}", response).to_string()));
         }
@@ -395,7 +395,7 @@ impl Platform for PhemexSwap {
         if response.code != 200 {
             return Err(Error::new(ErrorKind::NotFound, format!("phemex_swap 批量撤销订单失败 res_data: {:?}", response)));
         };
-        info!("{}", response.data.to_string());
+        // info!("{}", response.data.to_string());
         return Ok(vec![]);
     }
 
@@ -590,8 +590,23 @@ pub fn format_position_item(position: &Value, ct_val: Decimal) -> Position {
 }
 
 pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
-    let id = order["orderID"].as_str().unwrap().to_string();
-    let custom_id = order["clOrdID"].as_str().unwrap().to_string();
+    // info!("需要格式化的订单数据: {}", order);
+    let id = match order["orderId"].as_str() {
+        Some(val) => {
+            val.to_string()
+        },
+        None => {
+            order["orderID"].as_str().unwrap().to_string()
+        }
+    };
+    let custom_id = match order["clOrdId"].as_str() {
+        Some(val) => {
+            val.to_string()
+        },
+        None => {
+            order["clOrdID"].as_str().unwrap().to_string()
+        }
+    };
     let price = Decimal::from_str(order["priceRp"].as_str().unwrap()).unwrap();
     let amount = Decimal::from_str(order["orderQtyRq"].as_str().unwrap()).unwrap() * ct_val;
     let deal_amount = Decimal::from_str(order["cumQtyRq"].as_str().unwrap()).unwrap() * ct_val;
@@ -618,7 +633,7 @@ pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
         deal_amount,
         avg_price,
         status: custom_status,
-        order_type: order["orderType"].as_str().unwrap().to_lowercase(),
+        order_type: "limit".to_string(),
         trace_stack: TraceStack::new(0, Instant::now()).on_special("669 trace_stack".to_string()),
     };
     return result;

+ 3 - 3
standard/src/phemex_swap_handle.rs

@@ -52,7 +52,7 @@ pub fn handle_position(res_data: &ResponseData, ct_val: &Decimal) -> Vec<Positio
 pub fn format_position_item(position: &Value, ct_val: &Decimal) -> Position {
     let symbol = position["symbol"].as_str().unwrap().to_string();
     let margin_level = Decimal::from_str(position["leverageRr"].as_str().unwrap()).unwrap();
-    let price = Decimal::from_str(position["avgEntryPrice"].as_str().unwrap()).unwrap();
+    let price = Decimal::from_str(position["avgEntryPriceRp"].as_str().unwrap()).unwrap();
     let profit = Decimal::from_str(position["curTermRealisedPnlRv"].as_str().unwrap()).unwrap();
     let margin = Decimal::from_str(position["positionMarginRv"].as_str().unwrap()).unwrap();
     let position_mode = match position["posSide"].as_str().unwrap_or("") {
@@ -116,8 +116,8 @@ pub fn format_order_item(order: &Value, ct_val: Decimal) -> Order {
         deal_amount,
         avg_price,
         status: custom_status,
-        order_type: order["orderType"].as_str().unwrap().to_lowercase(),
-        trace_stack: TraceStack::new(0, Instant::now()).on_special("114 trace_stack".to_string()),
+        order_type: "limit".to_string(),
+        trace_stack: TraceStack::new(0, Instant::now()).on_special("120 trace_stack".to_string()),
     };
     return result;
 }

+ 5 - 1
strategy/src/core.rs

@@ -22,7 +22,7 @@ use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use global::trace_stack::TraceStack;
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialTicker, Ticker};
 use standard::exchange::{Exchange};
-use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap};
+use standard::exchange::ExchangeEnum::{BinanceSwap, BitgetSwap, BybitSwap, CoinexSwap, GateSwap, HtxSwap, KucoinSwap, PhemexSwap};
 
 use crate::model::{LocalPosition, OrderInfo, TokenParam};
 use crate::predictor::Predictor;
@@ -236,6 +236,10 @@ impl Core {
                 "htx_usdt_swap" => {
                     Exchange::new(HtxSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
                 }
+                "phemex_usdt_swap" => {
+                    standard::utils::proxy_handle(Some("phemex"));
+                    Exchange::new(PhemexSwap, symbol, params.colo != 0i8, exchange_params, order_sender, error_sender).await
+                }
                 _ => {
                     error!("203未找到对应的交易所rest枚举!");
                     panic!("203未找到对应的交易所rest枚举!");

+ 119 - 35
strategy/src/phemex_usdt_swap.rs

@@ -1,4 +1,5 @@
-use tracing::{error};
+use std::cmp::Ordering;
+use tracing::{error, info};
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
@@ -9,6 +10,8 @@ use exchanges::phemex_swap_ws::{PhemexSwapLogin, PhemexSwapSubscribeType, Phemex
 use exchanges::response_base::ResponseData;
 use global::trace_stack::{TraceStack};
 use standard::exchange::ExchangeEnum::{PhemexSwap};
+use standard::handle_info::DepthParam;
+use standard::MarketOrder;
 use crate::model::{OrderInfo};
 use crate::core::Core;
 use crate::exchange_disguise::on_special_depth;
@@ -30,18 +33,23 @@ pub async fn phemex_swap_run(is_shutdown_arc: Arc<AtomicBool>,
     let name_clone = name.clone();
     let core_arc_clone = core_arc.clone();
     let symbols_clone = symbols.clone();
-    let symbol = symbols.clone()[0].clone();
     spawn(async move {
         // 构建链接ws
         let mut bg_public = PhemexSwapWs::new_with_tag(name_clone,is_colo, None, PhemexSwapWsType::PublicAndPrivate);
 
         // 消费数据的函数
         let mut update_flag_u = Decimal::ZERO;
+        let depth_asks = Arc::new(Mutex::new(Vec::new()));
+        let depth_bids = Arc::new(Mutex::new(Vec::new()));
+
         let fun = move |data: ResponseData| {
+            let depth_asks = depth_asks.clone();
+            let depth_bids = depth_bids.clone();
             let core_arc_cc = core_arc_clone.clone();
-            let rs = symbol.clone();
             async move {
-                on_public_data(core_arc_cc, &mut update_flag_u, rs, data).await
+                let mut depth_asks = depth_asks.lock().await;
+                let mut depth_bids = depth_bids.lock().await;
+                on_public_data(core_arc_cc, &mut update_flag_u, data, &mut depth_asks, &mut depth_bids).await
             }
         };
 
@@ -98,37 +106,58 @@ async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
 
-    let order_channel = "orders_cross";
-    let position_channel = "positions_cross";
-    let balance_channel = "accounts_cross";
-    if response.channel.contains(order_channel) { // 订单频道
+    if response.channel == "accounts_orders_positions" { // 订单频道
+        info!("3合一推送 {}", response.data);
         trace_stack.set_source("phemex_swap.orders".to_string());
-        let orders = standard::handle_info::HandleSwapInfo::handle_order(PhemexSwap, response.clone(), ct_val.clone());
-        let mut order_infos:Vec<OrderInfo> = Vec::new();
-        for mut order in orders.order {
-            if order.status == "NULL" {
-                error!("phemex_usdt_swap 未识别的订单状态:{:?}", response);
-                continue;
+        let accounts = response.data["accounts_p"].as_array().unwrap_or(&vec![]).clone();
+        let positions = response.data["positions_p"].as_array().unwrap_or(&vec![]).clone();
+        let orders = response.data["orders_p"].as_array().unwrap_or(&vec![]).clone();
+
+        // 订单
+        if orders.len() > 0{
+            let mut order_res = response.clone();
+            order_res.data = order_res.data["orders_p"].clone();
+            let orders = standard::handle_info::HandleSwapInfo::handle_order(PhemexSwap, order_res, ct_val.clone());
+            let mut order_infos:Vec<OrderInfo> = Vec::new();
+            for mut order in orders.order {
+                if order.status == "NULL" {
+                    error!("phemex_usdt_swap 未识别的订单状态:{:?}", response);
+                    continue;
+                }
+                let mut order_info = OrderInfo::parse_order_to_order_info(&mut order);
+                order_info.trace_stack.source = "phemex_usdt_swap 111".to_string();
+                order_infos.push(order_info);
+            }
+
+            {
+                let mut core = core_arc_clone.lock().await;
+                core.update_order(order_infos, trace_stack).await;
             }
-            let mut order_info = OrderInfo::parse_order_to_order_info(&mut order);
-            order_info.trace_stack.source = "phemex_usdt_swap 118".to_string();
-            order_infos.push(order_info);
         }
 
-        {
-            let mut core = core_arc_clone.lock().await;
-            core.update_order(order_infos, trace_stack).await;
+        // 仓位
+        if positions.len() > 0 {
+            let mut position_res = response.clone();
+            position_res.data = position_res.data["positions_p"].clone();
+            let positions = standard::handle_info::HandleSwapInfo::handle_position(PhemexSwap, &position_res, &ct_val);
+
+            {
+                let mut core = core_arc_clone.lock().await;
+                core.update_position(positions).await;
+            }
         }
-    } else if response.channel.contains(position_channel) { // 仓位频道
-        let positions = standard::handle_info::HandleSwapInfo::handle_position(PhemexSwap, &response, &ct_val);
-        let mut core = core_arc_clone.lock().await;
 
-        core.update_position(positions).await;
-    } else if response.channel.contains(balance_channel)  { // 余额频道
-        let account = standard::handle_info::HandleSwapInfo::handle_account_info(PhemexSwap, &response, run_symbol);
-        let mut core = core_arc_clone.lock().await;
+        // 账户
+        if accounts.len() > 0 {
+            let mut account_res = response.clone();
+            account_res.data = account_res.data["accounts_p"].clone();
+            let account = standard::handle_info::HandleSwapInfo::handle_account_info(PhemexSwap, &account_res, run_symbol);
 
-        core.update_equity(account).await;
+            {
+                let mut core = core_arc_clone.lock().await;
+                core.update_equity(account).await;
+            }
+        }
     } else {
         error!("未知推送类型");
         error!(?response);
@@ -137,16 +166,32 @@ async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
 
 async fn on_public_data(core_arc_clone: Arc<Mutex<Core>>,
                         update_flag_u: &mut Decimal,
-                        run_symbol: String,
-                        response: ResponseData) {
+                        response: ResponseData,
+                        depth_asks: &mut Vec<MarketOrder>,
+                        depth_bids: &mut Vec<MarketOrder>) {
     let mut trace_stack = TraceStack::new(response.time, response.ins);
     trace_stack.on_after_span_line();
-    let channel_symbol = run_symbol.replace("_", "-");
-    let depth_channel = format!("market.{}.depth.step0", channel_symbol.to_uppercase());
-    // public类型,目前只考虑订单流数据
-    if response.channel == depth_channel { // 深度频道
+    // public类型,目前只考虑订单簿数据
+    if response.channel == "futures.order_book" { // 深度频道
+        let label = response.label.clone();
+        let mut is_update = false;
+        let data_type = response.data_type.clone();
         trace_stack.set_source("phemex_usdt_swap.depth".to_string());
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(PhemexSwap, &response);
+        let mut depth_format: DepthParam = standard::handle_info::format_depth(PhemexSwap, &response);
+        // 是否是增量
+        if data_type == "incremental"  {
+            is_update = true;
+        }
+
+        if is_update {
+            update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
+        } else { // 全量
+            depth_asks.clear();
+            depth_asks.append(&mut depth_format.depth_asks);
+            depth_bids.clear();
+            depth_bids.append(&mut depth_format.depth_bids);
+        }
+        let special_depth = standard::handle_info::make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
         trace_stack.on_after_format();
 
         on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
@@ -162,3 +207,42 @@ fn parse_btree_map_to_phemex_swap_login(exchange_params: BTreeMap<String, String
         secret_key: exchange_params.get("secret_key").unwrap().clone()
     }
 }
+
+fn update_order_book(depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, asks : Vec<MarketOrder>, bids: Vec<MarketOrder>) {
+    for i in asks {
+        let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
+        match index_of_value {
+            Some(index) => {
+                if i.amount == Decimal::ZERO {
+                    depth_asks.remove(index);
+                } else {
+                    depth_asks[index].amount = i.amount.clone();
+                }
+            },
+            None => {
+                depth_asks.push(i.clone());
+            },
+        }
+    }
+    for i in bids {
+        let index_of_value = depth_bids.iter().position(|x| x.price == i.price);
+        match index_of_value {
+            Some(index) => {
+                if i.amount == Decimal::ZERO {
+                    depth_bids.remove(index);
+                } else {
+                    depth_bids[index].amount = i.amount.clone();
+                }
+            },
+            None => {
+                depth_bids.push(i.clone());
+            },
+        }
+    }
+    depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
+    depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
+
+    // 限制总长度100
+    depth_asks.truncate(100);
+    depth_bids.truncate(100);
+}