浏览代码

gate也使用新架构了。

skyfffire 1 年之前
父节点
当前提交
54dc366375
共有 6 个文件被更改,包括 581 次插入575 次删除
  1. 349 344
      exchanges/src/gate_swap_ws.rs
  2. 8 8
      src/core_libs.rs
  3. 5 4
      strategy/src/binance_usdt_swap.rs
  4. 33 33
      strategy/src/exchange_disguise.rs
  5. 184 185
      strategy/src/gate_swap.rs
  6. 2 1
      tests/framework_3_0_test.rs

+ 349 - 344
exchanges/src/gate_swap_ws.rs

@@ -1,344 +1,349 @@
-// use std::sync::Arc;
-// use std::sync::atomic::AtomicBool;
-// use chrono::Utc;
-//
-// use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
-// use hex;
-// use hmac::{Hmac, Mac, NewMac};
-// use serde_json::{json, Value};
-// use sha2::Sha512;
-// use tokio::sync::Mutex;
-// use tokio_tungstenite::tungstenite::{Error, Message};
-// use tracing::{info, trace};
-//
-// use crate::response_base::ResponseData;
-// use crate::socket_tool::{AbstractWsMode, HeartbeatType};
-//
-// //类型
-// pub enum GateSwapWsType {
-//     PublicAndPrivate(String),
-// }
-//
-//
-// //订阅频道
-// #[derive(Clone)]
-// pub enum GateSwapSubscribeType {
-//     PuFuturesOrderBook,
-//     PuFuturesCandlesticks,
-//     PuFuturesTrades,
-//
-//     PrFuturesOrders(String),
-//     PrFuturesPositions(String),
-//     PrFuturesBalances(String),
-// }
-//
-// //账号信息
-// #[derive(Clone)]
-// #[allow(dead_code)]
-// pub struct GateSwapLogin {
-//     pub api_key: String,
-//     pub secret: String,
-// }
-//
-//
-// #[derive(Clone)]
-// pub struct GateSwapWs {
-//     //类型
-//     label: String,
-//     //地址
-//     address_url: String,
-//     //账号信息
-//     login_param: Option<GateSwapLogin>,
-//     //币对
-//     symbol_s: Vec<String>,
-//     //订阅
-//     subscribe_types: Vec<GateSwapSubscribeType>,
-//     //心跳间隔
-//     heartbeat_time: u64,
-// }
-//
-// impl GateSwapWs {
-//     /*******************************************************************************************************/
-//     /*****************************************获取一个对象****************************************************/
-//     /*******************************************************************************************************/
-//     pub fn new(is_colo: bool, login_param: Option<GateSwapLogin>, ws_type: GateSwapWsType) -> GateSwapWs {
-//         return GateSwapWs::new_label("default-GateSwapWs".to_string(), is_colo, login_param, ws_type);
-//     }
-//     pub fn new_label(label: String, is_colo: bool, login_param: Option<GateSwapLogin>, ws_type: GateSwapWsType) -> GateSwapWs
-//     {
-//         /*******公共频道-私有频道数据组装*/
-//         let address_url = match ws_type {
-//             GateSwapWsType::PublicAndPrivate(name) => {
-//                 if is_colo {
-//                     let url = format!("wss://fxws-private.gateapi.io/v4/ws/{}", name.to_string());
-//                     info!("开启高速通道:{:?}",url);
-//                     url
-//                 } else {
-//                     let url = format!("wss://fx-ws.gateio.ws/v4/ws/{}", name.to_string());
-//                     info!("走普通通道:{}",url);
-//                     url
-//                 }
-//             }
-//         };
-//
-//
-//         GateSwapWs {
-//             label,
-//             address_url,
-//             login_param,
-//             symbol_s: vec![],
-//             subscribe_types: vec![],
-//             heartbeat_time: 1000 * 10,
-//         }
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************订阅函数********************************************************/
-//     /*******************************************************************************************************/
-//     //手动添加订阅信息
-//     pub fn set_subscribe(&mut self, subscribe_types: Vec<GateSwapSubscribeType>) {
-//         self.subscribe_types.extend(subscribe_types);
-//     }
-//     //手动添加币对
-//     pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
-//         for symbol in b_array.iter_mut() {
-//             // 大写
-//             *symbol = symbol.to_uppercase();
-//             // 字符串替换
-//             *symbol = symbol.replace("-", "_");
-//         }
-//         self.symbol_s = b_array;
-//     }
-//     //频道是否需要登录
-//     fn contains_pr(&self) -> bool {
-//         for t in self.subscribe_types.clone() {
-//             if match t {
-//                 GateSwapSubscribeType::PuFuturesOrderBook => false,
-//                 GateSwapSubscribeType::PuFuturesCandlesticks => false,
-//                 GateSwapSubscribeType::PuFuturesTrades => false,
-//
-//                 GateSwapSubscribeType::PrFuturesOrders(_) => true,
-//                 GateSwapSubscribeType::PrFuturesPositions(_) => true,
-//                 GateSwapSubscribeType::PrFuturesBalances(_) => true,
-//             } {
-//                 return true;
-//             }
-//         }
-//         false
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************工具函数********************************************************/
-//     /*******************************************************************************************************/
-//     //订阅枚举解析
-//     pub fn enum_to_string(symbol: String, subscribe_type: GateSwapSubscribeType, login_param: Option<GateSwapLogin>) -> Value {
-//         let time = chrono::Utc::now().timestamp();
-//         let mut access_key = "".to_string();
-//         let mut secret_key = "".to_string();
-//         match login_param {
-//             None => {}
-//             Some(param) => {
-//                 access_key = param.api_key.clone();
-//                 secret_key = param.secret.clone();
-//             }
-//         }
-//
-//         match subscribe_type {
-//             GateSwapSubscribeType::PuFuturesOrderBook => {
-//                 json!({
-//                     "time": time,
-//                     "channel": "futures.order_book",
-//                     "event": "subscribe",
-//                     "payload":  [symbol, "20", "0"]
-//                 })
-//             }
-//             GateSwapSubscribeType::PuFuturesCandlesticks => {
-//                 json!({
-//                     "time": time,
-//                     "channel": "futures.candlesticks",
-//                     "event": "subscribe",
-//                     "payload":  ["1m", symbol]
-//                 })
-//             }
-//             GateSwapSubscribeType::PrFuturesOrders(user_id) => {
-//                 json!({
-//                     "time": time,
-//                     "channel": "futures.orders",
-//                     "event": "subscribe",
-//                     "payload":  [user_id, symbol],
-//                     "auth": {
-//                         "method": "api_key",
-//                         "KEY": access_key,
-//                         "SIGN":Self::sign(secret_key.to_string(),
-//                               "futures.orders".to_string(),
-//                               "subscribe".to_string(),
-//                               time.to_string())
-//                     }
-//                 })
-//             }
-//             GateSwapSubscribeType::PrFuturesPositions(user_id) => {
-//                 json!({
-//                     "time": time,
-//                     "channel": "futures.positions",
-//                     "event": "subscribe",
-//                     "payload":  [user_id, symbol],
-//                     "auth": {
-//                         "method": "api_key",
-//                         "KEY": access_key,
-//                         "SIGN":Self::sign(secret_key.to_string(),
-//                               "futures.positions".to_string(),
-//                               "subscribe".to_string(),
-//                               time.to_string())
-//                     }
-//                 })
-//             }
-//             GateSwapSubscribeType::PrFuturesBalances(user_id) => {
-//                 json!({
-//                     "time": time,
-//                     "channel": "futures.balances",
-//                     "event": "subscribe",
-//                     "payload":  [user_id],
-//                     "auth": {
-//                         "method": "api_key",
-//                         "KEY": access_key,
-//                         "SIGN":Self::sign(secret_key.to_string(),
-//                               "futures.balances".to_string(),
-//                               "subscribe".to_string(),
-//                               time.to_string())
-//                     }
-//                 })
-//             }
-//             GateSwapSubscribeType::PuFuturesTrades => {
-//                 json!({
-//                     "time": time,
-//                     "channel": "futures.trades",
-//                     "event": "subscribe",
-//                     "payload":  [symbol]
-//                 })
-//             }
-//         }
-//     }
-//     //订阅信息生成
-//     pub fn get_subscription(&self) -> Vec<Value> {
-//         let mut args = vec![];
-//         for symbol in &self.symbol_s {
-//             for subscribe_type in &self.subscribe_types {
-//                 let ty_str = Self::enum_to_string(symbol.clone(),
-//                                                   subscribe_type.clone(),
-//                                                   self.login_param.clone(),
-//                 );
-//                 args.push(ty_str);
-//             }
-//         }
-//         args
-//     }
-//     //生成签名
-//     fn sign(secret_key: String, channel: String, event: String, time: String) -> String {
-//         let message = format!("channel={}&event={}&time={}", channel, event, time);
-//         let mut mac = Hmac::<Sha512>::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC");
-//         mac.update(message.as_bytes());
-//         let result = mac.finalize().into_bytes();
-//         let sign = hex::encode(result);
-//         sign
-//     }
-//     /*******************************************************************************************************/
-//     /*****************************************socket基本*****************************************************/
-//     /*******************************************************************************************************/
-//     //链接
-//     pub async fn ws_connect_async(&mut self,
-//                                   is_shutdown_arc: Arc<AtomicBool>,
-//                                   write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
-//                                   write_rx: UnboundedReceiver<Message>,
-//                                   read_tx: UnboundedSender<ResponseData>) -> Result<(), Error>
-//     {
-//         let login_is = self.contains_pr();
-//         let subscription = self.get_subscription();
-//         let address_url = self.address_url.clone();
-//         let label = self.label.clone();
-//         let heartbeat_time = self.heartbeat_time.clone();
-//         let timestamp = Utc::now().timestamp();
-//
-//
-//         //心跳-- 方法内部线程启动
-//         let write_tx_clone1 = Arc::clone(write_tx_am);
-//         tokio::spawn(async move {
-//             trace!("线程-异步心跳-开始");
-//             let ping_str = json!({
-//                 "time" : timestamp,
-//                 "channel" : "futures.ping",
-//             });
-//             AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
-//             trace!("线程-异步心跳-结束");
-//         });
-//
-//         //设置订阅
-//         let mut subscribe_array = vec![];
-//         if login_is {
-//             //登录相关
-//         }
-//         for s in subscription {
-//             subscribe_array.push(s.to_string());
-//         }
-//
-//         //链接
-//         let t2 = tokio::spawn(async move {
-//             trace!("线程-异步链接-开始");
-//             match AbstractWsMode::ws_connect_async(is_shutdown_arc, address_url.clone(),
-//                                                    label.clone(), subscribe_array,
-//                                                    write_rx, read_tx,
-//                                                    Self::message_text,
-//                                                    Self::message_ping,
-//                                                    Self::message_pong,
-//             ).await {
-//                 Ok(_) => { trace!("线程-异步链接-结束"); }
-//                 Err(e) => { trace!("发生异常:gate-期货链接关闭-{:?}",e); }
-//             }
-//         });
-//         tokio::try_join!(t2).unwrap();
-//         trace!("线程-心跳与链接-结束");
-//
-//         Ok(())
-//     }
-//     /*******************************************************************************************************/
-//     /*****************************************数据解析*****************************************************/
-//     /*******************************************************************************************************/
-//     //数据解析-Text
-//     pub fn message_text(text: String) -> Option<ResponseData> {
-//         let response_data = Self::ok_text(text);
-//         Option::from(response_data)
-//     }
-//     //数据解析-ping
-//     pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
-//         return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
-//     }
-//     //数据解析-pong
-//     pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
-//         return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
-//     }
-//     //数据解析
-//     pub fn ok_text(text: String) -> ResponseData
-//     {
-//         // trace!("原始数据:{}", text);
-//         let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
-//         let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
-//
-//         if json_value["channel"].as_str() == Option::from("futures.pong") {
-//             res_data.code = "-301".to_string();
-//             res_data.message = "success".to_string();
-//         } else if json_value.get("error").is_some() {
-//             let message = json_value["error"]["message"].as_str().unwrap().to_string();
-//             let mes = message.trim_end_matches('\n');
-//
-//             res_data.code = json_value["error"]["code"].to_string();
-//             res_data.message = mes.to_string();
-//         } else if json_value["result"]["status"].as_str() == Option::from("success") {//订阅返回
-//             res_data.code = "-201".to_string();
-//             res_data.data = text;
-//         } else {
-//             res_data.channel = format!("{}", json_value["channel"].as_str().unwrap());
-//             res_data.code = "200".to_string();
-//             res_data.data = json_value["result"].to_string();
-//         }
-//         res_data
-//     }
-// }
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use chrono::Utc;
+
+use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
+use hex;
+use hmac::{Hmac, Mac, NewMac};
+use serde_json::{json, Value};
+use sha2::Sha512;
+use tokio::sync::Mutex;
+use tokio_tungstenite::tungstenite::{Error, Message};
+use tracing::{info, trace};
+
+use crate::response_base::ResponseData;
+use crate::socket_tool::{AbstractWsMode, HeartbeatType};
+
+//类型
+pub enum GateSwapWsType {
+    PublicAndPrivate(String),
+}
+
+
+//订阅频道
+#[derive(Clone)]
+pub enum GateSwapSubscribeType {
+    PuFuturesOrderBook,
+    PuFuturesCandlesticks,
+    PuFuturesTrades,
+
+    PrFuturesOrders(String),
+    PrFuturesPositions(String),
+    PrFuturesBalances(String),
+}
+
+//账号信息
+#[derive(Clone)]
+#[allow(dead_code)]
+pub struct GateSwapLogin {
+    pub api_key: String,
+    pub secret: String,
+}
+
+
+#[derive(Clone)]
+pub struct GateSwapWs {
+    //类型
+    label: String,
+    //地址
+    address_url: String,
+    //账号信息
+    login_param: Option<GateSwapLogin>,
+    //币对
+    symbol_s: Vec<String>,
+    //订阅
+    subscribe_types: Vec<GateSwapSubscribeType>,
+    //心跳间隔
+    heartbeat_time: u64,
+}
+
+impl GateSwapWs {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(is_colo: bool, login_param: Option<GateSwapLogin>, ws_type: GateSwapWsType) -> GateSwapWs {
+        return GateSwapWs::new_label("default-GateSwapWs".to_string(), is_colo, login_param, ws_type);
+    }
+    pub fn new_label(label: String, is_colo: bool, login_param: Option<GateSwapLogin>, ws_type: GateSwapWsType) -> GateSwapWs
+    {
+        /*******公共频道-私有频道数据组装*/
+        let address_url = match ws_type {
+            GateSwapWsType::PublicAndPrivate(name) => {
+                if is_colo {
+                    let url = format!("wss://fxws-private.gateapi.io/v4/ws/{}", name.to_string());
+                    info!("开启高速通道:{:?}",url);
+                    url
+                } else {
+                    let url = format!("wss://fx-ws.gateio.ws/v4/ws/{}", name.to_string());
+                    info!("走普通通道:{}",url);
+                    url
+                }
+            }
+        };
+
+
+        GateSwapWs {
+            label,
+            address_url,
+            login_param,
+            symbol_s: vec![],
+            subscribe_types: vec![],
+            heartbeat_time: 1000 * 10,
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************订阅函数********************************************************/
+    /*******************************************************************************************************/
+    //手动添加订阅信息
+    pub fn set_subscribe(&mut self, subscribe_types: Vec<GateSwapSubscribeType>) {
+        self.subscribe_types.extend(subscribe_types);
+    }
+    //手动添加币对
+    pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
+        for symbol in b_array.iter_mut() {
+            // 大写
+            *symbol = symbol.to_uppercase();
+            // 字符串替换
+            *symbol = symbol.replace("-", "_");
+        }
+        self.symbol_s = b_array;
+    }
+    //频道是否需要登录
+    fn contains_pr(&self) -> bool {
+        for t in self.subscribe_types.clone() {
+            if match t {
+                GateSwapSubscribeType::PuFuturesOrderBook => false,
+                GateSwapSubscribeType::PuFuturesCandlesticks => false,
+                GateSwapSubscribeType::PuFuturesTrades => false,
+
+                GateSwapSubscribeType::PrFuturesOrders(_) => true,
+                GateSwapSubscribeType::PrFuturesPositions(_) => true,
+                GateSwapSubscribeType::PrFuturesBalances(_) => true,
+            } {
+                return true;
+            }
+        }
+        false
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    //订阅枚举解析
+    pub fn enum_to_string(symbol: String, subscribe_type: GateSwapSubscribeType, login_param: Option<GateSwapLogin>) -> Value {
+        let time = chrono::Utc::now().timestamp();
+        let mut access_key = "".to_string();
+        let mut secret_key = "".to_string();
+        match login_param {
+            None => {}
+            Some(param) => {
+                access_key = param.api_key.clone();
+                secret_key = param.secret.clone();
+            }
+        }
+
+        match subscribe_type {
+            GateSwapSubscribeType::PuFuturesOrderBook => {
+                json!({
+                    "time": time,
+                    "channel": "futures.order_book",
+                    "event": "subscribe",
+                    "payload":  [symbol, "20", "0"]
+                })
+            }
+            GateSwapSubscribeType::PuFuturesCandlesticks => {
+                json!({
+                    "time": time,
+                    "channel": "futures.candlesticks",
+                    "event": "subscribe",
+                    "payload":  ["1m", symbol]
+                })
+            }
+            GateSwapSubscribeType::PrFuturesOrders(user_id) => {
+                json!({
+                    "time": time,
+                    "channel": "futures.orders",
+                    "event": "subscribe",
+                    "payload":  [user_id, symbol],
+                    "auth": {
+                        "method": "api_key",
+                        "KEY": access_key,
+                        "SIGN":Self::sign(secret_key.to_string(),
+                              "futures.orders".to_string(),
+                              "subscribe".to_string(),
+                              time.to_string())
+                    }
+                })
+            }
+            GateSwapSubscribeType::PrFuturesPositions(user_id) => {
+                json!({
+                    "time": time,
+                    "channel": "futures.positions",
+                    "event": "subscribe",
+                    "payload":  [user_id, symbol],
+                    "auth": {
+                        "method": "api_key",
+                        "KEY": access_key,
+                        "SIGN":Self::sign(secret_key.to_string(),
+                              "futures.positions".to_string(),
+                              "subscribe".to_string(),
+                              time.to_string())
+                    }
+                })
+            }
+            GateSwapSubscribeType::PrFuturesBalances(user_id) => {
+                json!({
+                    "time": time,
+                    "channel": "futures.balances",
+                    "event": "subscribe",
+                    "payload":  [user_id],
+                    "auth": {
+                        "method": "api_key",
+                        "KEY": access_key,
+                        "SIGN":Self::sign(secret_key.to_string(),
+                              "futures.balances".to_string(),
+                              "subscribe".to_string(),
+                              time.to_string())
+                    }
+                })
+            }
+            GateSwapSubscribeType::PuFuturesTrades => {
+                json!({
+                    "time": time,
+                    "channel": "futures.trades",
+                    "event": "subscribe",
+                    "payload":  [symbol]
+                })
+            }
+        }
+    }
+    //订阅信息生成
+    pub fn get_subscription(&self) -> Vec<Value> {
+        let mut args = vec![];
+        for symbol in &self.symbol_s {
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol.clone(),
+                                                  subscribe_type.clone(),
+                                                  self.login_param.clone(),
+                );
+                args.push(ty_str);
+            }
+        }
+        args
+    }
+    //生成签名
+    fn sign(secret_key: String, channel: String, event: String, time: String) -> String {
+        let message = format!("channel={}&event={}&time={}", channel, event, time);
+        let mut mac = Hmac::<Sha512>::new_varkey(secret_key.as_bytes()).expect("Failed to create HMAC");
+        mac.update(message.as_bytes());
+        let result = mac.finalize().into_bytes();
+        let sign = hex::encode(result);
+        sign
+    }
+    /*******************************************************************************************************/
+    /*****************************************socket基本*****************************************************/
+    /*******************************************************************************************************/
+    //链接
+    pub async fn ws_connect_async<F, Future>(&mut self,
+                                             is_shutdown_arc: Arc<AtomicBool>,
+                                             handle_function: F,
+                                             write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
+                                             write_rx: UnboundedReceiver<Message>) -> Result<(), Error>
+        where
+            F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
+            Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
+    {
+        let login_is = self.contains_pr();
+        let subscription = self.get_subscription();
+        let address_url = self.address_url.clone();
+        let label = self.label.clone();
+        let heartbeat_time = self.heartbeat_time.clone();
+        let timestamp = Utc::now().timestamp();
+
+        //心跳-- 方法内部线程启动
+        let write_tx_clone1 = Arc::clone(write_tx_am);
+        tokio::spawn(async move {
+            trace!("线程-异步心跳-开始");
+            let ping_str = json!({
+                "time" : timestamp,
+                "channel" : "futures.ping",
+            });
+            AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Custom(ping_str.to_string()), heartbeat_time).await;
+            trace!("线程-异步心跳-结束");
+        });
+
+        //设置订阅
+        let mut subscribe_array = vec![];
+        if login_is {
+            //登录相关
+        }
+        for s in subscription {
+            subscribe_array.push(s.to_string());
+        }
+
+        //链接
+        let t2 = tokio::spawn(async move {
+            trace!("线程-异步链接-开始");
+            match AbstractWsMode::ws_connect_async(is_shutdown_arc,
+                                                   handle_function,
+                                                   address_url.clone(),
+                                                   label.clone(),
+                                                   subscribe_array,
+                                                   write_rx,
+                                                   Self::message_text,
+                                                   Self::message_ping,
+                                                   Self::message_pong,
+            ).await {
+                Ok(_) => { trace!("线程-异步链接-结束"); }
+                Err(e) => { trace!("发生异常:gate-期货链接关闭-{:?}",e); }
+            }
+        });
+        tokio::try_join!(t2).unwrap();
+        trace!("线程-心跳与链接-结束");
+
+        Ok(())
+    }
+    /*******************************************************************************************************/
+    /*****************************************数据解析*****************************************************/
+    /*******************************************************************************************************/
+    //数据解析-Text
+    pub fn message_text(text: String) -> Option<ResponseData> {
+        let response_data = Self::ok_text(text);
+        Option::from(response_data)
+    }
+    //数据解析-ping
+    pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析-pong
+    pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
+        return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
+    }
+    //数据解析
+    pub fn ok_text(text: String) -> ResponseData
+    {
+        // trace!("原始数据:{}", text);
+        let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
+        let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
+
+        if json_value["channel"].as_str() == Option::from("futures.pong") {
+            res_data.code = "-301".to_string();
+            res_data.message = "success".to_string();
+        } else if json_value.get("error").is_some() {
+            let message = json_value["error"]["message"].as_str().unwrap().to_string();
+            let mes = message.trim_end_matches('\n');
+
+            res_data.code = json_value["error"]["code"].to_string();
+            res_data.message = mes.to_string();
+        } else if json_value["result"]["status"].as_str() == Option::from("success") {//订阅返回
+            res_data.code = "-201".to_string();
+            res_data.data = text;
+        } else {
+            res_data.channel = format!("{}", json_value["channel"].as_str().unwrap());
+            res_data.code = "200".to_string();
+            res_data.data = json_value["result"].to_string();
+        }
+        res_data
+    }
+}

+ 8 - 8
src/core_libs.rs

@@ -36,7 +36,7 @@ pub async fn init(params: Params,
                                    running.clone(),
                                    cci_arc.clone()).await;
     let ref_name = core_obj.ref_name[0].clone();
-    // let trade_name = core_obj.trade_name.clone();
+    let trade_name = core_obj.trade_name.clone();
 
     info!("core初始化……");
     core_obj.before_trade().await;
@@ -51,13 +51,13 @@ pub async fn init(params: Params,
                                               params.colo != 0i8,
                                               exchange_params.clone()).await;
     // 交易交易所
-    // exchange_disguise::run_transactional_exchange(ws_running.clone(),
-    //                                               params.exchange,
-    //                                               core_arc.clone(),
-    //                                               trade_name,
-    //                                               vec![params.pair.clone()],
-    //                                               params.colo != 0i8,
-    //                                               exchange_params.clone()).await;
+    exchange_disguise::run_transactional_exchange(ws_running.clone(),
+                                                  params.exchange,
+                                                  core_arc.clone(),
+                                                  trade_name,
+                                                  vec![params.pair.clone()],
+                                                  params.colo != 0i8,
+                                                  exchange_params.clone()).await;
     // 启动定期触发的系统逻辑
     core::on_timer(core_arc.clone());
     // 启动策略逻辑

+ 5 - 4
strategy/src/binance_usdt_swap.rs

@@ -30,8 +30,7 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
             // BinanceSwapSubscribeType::PuAggTrade
         ]);
 
-
-        //读取数据
+        // 读取数据
         let mut update_flag_u = Decimal::ZERO;
         let core_arc_clone = Arc::clone(&core_arc);
         let fun = move |data| {
@@ -39,7 +38,9 @@ pub(crate) async fn reference_binance_swap_run(is_shutdown_arc: Arc<AtomicBool>,
             let core_arc_clone = core_arc_clone.clone();
             async move {
                 // 使用克隆后的 Arc,避免 move 语义
-                on_data(core_arc_clone, &mut update_flag_u, data).await
+                on_data(core_arc_clone,
+                        &mut update_flag_u,
+                        data).await
             }
         };
 
@@ -56,7 +57,7 @@ async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
     trace_stack.on_after_network(data.time);
     trace_stack.on_before_unlock_core();
 
-    info!("{}", trace_stack.to_string());
+    info!("bian: {}", trace_stack.to_string());
 
     if data.code != "200".to_string() {
         return;

+ 33 - 33
strategy/src/exchange_disguise.rs

@@ -7,7 +7,7 @@ use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
 
 use crate::binance_usdt_swap::reference_binance_swap_run;
-// use crate::gate_swap::gate_swap_run;
+use crate::gate_swap::gate_swap_run;
 // use crate::binance_spot::reference_binance_spot_run;
 // use crate::bitget_spot::bitget_spot_run;
 // use crate::bybit_usdt_swap::bybit_swap_run;
@@ -17,35 +17,35 @@ use crate::binance_usdt_swap::reference_binance_swap_run;
 use crate::core::Core;
 
 // 交易交易所启动
-// pub async fn run_transactional_exchange(is_shutdown_arc :Arc<AtomicBool>,
-//                                         exchange_name: String,
-//                                         core_arc: Arc<Mutex<Core>>,
-//                                         name: String,
-//                                         symbols: Vec<String>,
-//                                         is_colo: bool,
-//                                         exchange_params: BTreeMap<String, String>) {
-//     match exchange_name.as_str() {
-//         "gate_usdt_swap" => {
-//             gate_swap_run(is_shutdown_arc, true, core_arc, name, symbols, is_colo, exchange_params).await;
-//         }
-//         // "kucoin_usdt_swap" => {
-//         //     kucoin_swap_run(is_shutdown_arc, true, core_arc, name, symbols, is_colo, exchange_params).await;
-//         // },
-//         // "okex_usdt_swap" => {
-//         //     okex_swap_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
-//         // },
-//         // "bitget_spot" => {
-//         //     bitget_spot_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
-//         // },
-//         // "bybit_usdt_swap" => {
-//         //     bybit_swap_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
-//         // }
-//         _ => {
-//             let msg = format!("不支持的交易交易所:{}", exchange_name);
-//             panic!("{}", msg);
-//         }
-//     }
-// }
+pub async fn run_transactional_exchange(is_shutdown_arc :Arc<AtomicBool>,
+                                        exchange_name: String,
+                                        core_arc: Arc<Mutex<Core>>,
+                                        name: String,
+                                        symbols: Vec<String>,
+                                        is_colo: bool,
+                                        exchange_params: BTreeMap<String, String>) {
+    match exchange_name.as_str() {
+        "gate_usdt_swap" => {
+            gate_swap_run(is_shutdown_arc, true, core_arc, name, symbols, is_colo, exchange_params).await;
+        }
+        // "kucoin_usdt_swap" => {
+        //     kucoin_swap_run(is_shutdown_arc, true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "okex_usdt_swap" => {
+        //     okex_swap_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "bitget_spot" => {
+        //     bitget_spot_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "bybit_usdt_swap" => {
+        //     bybit_swap_run(is_shutdown_arc,true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // }
+        _ => {
+            let msg = format!("不支持的交易交易所:{}", exchange_name);
+            panic!("{}", msg);
+        }
+    }
+}
 
 // 参考交易所启动
 pub async fn run_reference_exchange(is_shutdown_arc: Arc<AtomicBool>,
@@ -62,9 +62,9 @@ pub async fn run_reference_exchange(is_shutdown_arc: Arc<AtomicBool>,
         // "binance_spot" => {
         //     reference_binance_spot_run(is_shutdown_arc, core_arc, name, symbols, is_colo, exchange_params).await;
         // },
-        // "gate_usdt_swap" => {
-        //     gate_swap_run(is_shutdown_arc, false, core_arc, name, symbols, is_colo, exchange_params).await;
-        // },
+        "gate_usdt_swap" => {
+            gate_swap_run(is_shutdown_arc, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        },
         // "okex_usdt_swap" => {
         //     okex_swap_run(is_shutdown_arc, false, core_arc, name, symbols, is_colo, exchange_params).await;
         // },

+ 184 - 185
strategy/src/gate_swap.rs

@@ -1,185 +1,184 @@
-// use std::collections::BTreeMap;
-// use std::sync::Arc;
-// use std::sync::atomic::AtomicBool;
-// use futures_util::StreamExt;
-// use rust_decimal::Decimal;
-// use serde_json::Value;
-// use tokio::spawn;
-// use tokio::sync::Mutex;
-// use tracing::info;
-// use exchanges::gate_swap_rest::GateSwapRest;
-// use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
-// use exchanges::response_base::ResponseData;
-// use global::trace_stack::TraceStack;
-// use standard::exchange::ExchangeEnum::GateSwap;
-// use crate::model::{OrderInfo, OriginalTradeGa};
-// use crate::core::Core;
-// use crate::exchange_disguise::on_special_depth;
-//
-// // 1交易、0参考 gate 合约 启动
-// pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
-//                            is_trade: bool,
-//                            core_arc: Arc<Mutex<Core>>,
-//                            name: String,
-//                            symbols: Vec<String>,
-//                            is_colo: bool,
-//                            exchange_params: BTreeMap<String, String>) {
-//     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-//     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-//
-//     let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone());
-//     let mut user_id= "".to_string();
-//
-//     // 交易
-//     if is_trade {
-//         // 获取user_id
-//         let res_data = gate_exc.wallet_fee().await;
-//         assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
-//
-//         let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
-//         info!(?wallet_obj);
-//         user_id = wallet_obj["user_id"].to_string();
-//     }
-//
-//     let write_tx_am = Arc::new(Mutex::new(write_tx));
-//     let symbols_clone = symbols.clone();
-//     spawn(async move {
-//         let mut ws;
-//         // 交易
-//         if is_trade {
-//             let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
-//             ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param),
-//                                        GateSwapWsType::PublicAndPrivate("usdt".to_string()));
-//             ws.set_subscribe(vec![
-//                 // GateSwapSubscribeType::PuFuturesTrades,
-//                 GateSwapSubscribeType::PuFuturesOrderBook,
-//                 GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
-//                 GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
-//                 GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
-//             ]);
-//         } else { // 参考
-//             ws = GateSwapWs::new_label(name.clone(), is_colo, None,
-//                                        GateSwapWsType::PublicAndPrivate("usdt".to_string()));
-//             ws.set_subscribe(vec![
-//                 GateSwapSubscribeType::PuFuturesTrades,
-//                 GateSwapSubscribeType::PuFuturesOrderBook
-//             ]);
-//         }
-//
-//         ws.set_symbols(symbols_clone);
-//         ws.ws_connect_async(is_shutdown_arc, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-//     });
-//
-//     spawn(async move {
-//         let core_arc_clone = Arc::clone(&core_arc);
-//         let run_symbol = symbols.clone()[0].clone();
-//         // trade
-//         let mut update_flag_u = Decimal::ZERO;
-//         let mut max_buy = Decimal::ZERO;
-//         let mut min_sell = Decimal::ZERO;
-//         let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
-//
-//         loop {
-//             while let Some(data) = read_rx.next().await {
-//                 on_data(core_arc_clone.clone(),
-//                         &mut update_flag_u,
-//                         multiplier,
-//                         run_symbol.clone(),
-//                         &mut max_buy,
-//                         &mut min_sell,
-//                         data).await;
-//             }
-//         }
-//     });
-// }
-//
-// async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
-//                  update_flag_u: &mut Decimal,
-//                  multiplier: Decimal,
-//                  run_symbol: String,
-//                  max_buy: &mut Decimal,
-//                  min_sell: &mut Decimal,
-//                  data: ResponseData) {
-//     let mut trace_stack = TraceStack::default();
-//     trace_stack.on_after_network(data.time);
-//     trace_stack.on_before_unlock_core();
-//
-//     if data.code != "200".to_string() {
-//         return;
-//     }
-//
-//     if data.channel == "futures.order_book" {
-//         trace_stack.on_before_format();
-//         let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data.clone());
-//         trace_stack.on_before_network(depth.create_at.clone());
-//         trace_stack.on_after_format();
-//
-//         on_special_depth(core_arc_clone, update_flag_u, data.label, trace_stack, depth).await;
-//     } else if data.channel == "futures.balances" {
-//         let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
-//         {
-//             let mut core = core_arc_clone.lock().await;
-//             core.update_equity(account).await;
-//         }
-//     } else if data.channel == "futures.orders" {
-//         trace_stack.on_before_format();
-//         let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
-//         trace_stack.on_after_format();
-//
-//         let mut order_infos:Vec<OrderInfo> = Vec::new();
-//         for order in orders.order {
-//             let order_info = OrderInfo {
-//                 symbol: "".to_string(),
-//                 amount: order.amount.abs(),
-//                 side: "".to_string(),
-//                 price: order.price,
-//                 client_id: order.custom_id,
-//                 filled_price: order.avg_price,
-//                 filled: order.deal_amount.abs(),
-//                 order_id: order.id,
-//                 local_time: 0,
-//                 create_time: 0,
-//                 status: order.status,
-//                 fee: Default::default(),
-//                 trace_stack: Default::default(),
-//             };
-//             order_infos.push(order_info);
-//         }
-//
-//         {
-//             let mut core = core_arc_clone.lock().await;
-//             core.update_order(order_infos, trace_stack);
-//         }
-//     } else if data.channel == "futures.positions" {
-//         let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
-//         {
-//             let mut core = core_arc_clone.lock().await;
-//             core.update_position(positions).await;
-//         }
-//     } else if data.channel == "futures.trades" {
-//         let mut core = core_arc_clone.lock().await;
-//         let str = data.label.clone();
-//         if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
-//             *max_buy = Decimal::ZERO;
-//             *min_sell = Decimal::ZERO;
-//             core.is_update.remove(str.as_str());
-//         }
-//         let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-//         for trade in trades {
-//             if trade.price > *max_buy || *max_buy == Decimal::ZERO{
-//                 *max_buy = trade.price
-//             }
-//             if trade.price < *min_sell || *min_sell == Decimal::ZERO{
-//                 *min_sell = trade.price
-//             }
-//         }
-//         core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-//     }
-// }
-//
-// fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap<String, String>) -> GateSwapLogin {
-//     GateSwapLogin {
-//         api_key: exchange_params.get("access_key").unwrap().clone(),
-//         secret: exchange_params.get("secret_key").unwrap().clone()
-//     }
-// }
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use rust_decimal::Decimal;
+use serde_json::Value;
+use tokio::spawn;
+use tokio::sync::Mutex;
+use tracing::info;
+use exchanges::gate_swap_rest::GateSwapRest;
+use exchanges::gate_swap_ws::{GateSwapLogin, GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
+use exchanges::response_base::ResponseData;
+use global::trace_stack::TraceStack;
+use standard::exchange::ExchangeEnum::GateSwap;
+use crate::model::{OrderInfo};
+use crate::core::Core;
+use crate::exchange_disguise::on_special_depth;
+
+// 1交易、0参考 gate 合约 启动
+pub async fn gate_swap_run(is_shutdown_arc: Arc<AtomicBool>,
+                           is_trade: bool,
+                           core_arc: Arc<Mutex<Core>>,
+                           name: String,
+                           symbols: Vec<String>,
+                           is_colo: bool,
+                           exchange_params: BTreeMap<String, String>) {
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+
+    let mut gate_exc = GateSwapRest::new(is_colo, exchange_params.clone());
+    let mut user_id= "".to_string();
+
+    // 交易
+    if is_trade {
+        // 获取user_id
+        let res_data = gate_exc.wallet_fee().await;
+        assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
+
+        let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
+        info!(?wallet_obj);
+        user_id = wallet_obj["user_id"].to_string();
+    }
+
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let symbols_clone = symbols.clone();
+    spawn(async move {
+        let mut ws;
+        // 交易
+        if is_trade {
+            let login_param = parse_btree_map_to_gate_swap_login(exchange_params);
+            ws = GateSwapWs::new_label(name.clone(), is_colo, Some(login_param),
+                                       GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+            ws.set_subscribe(vec![
+                // GateSwapSubscribeType::PuFuturesTrades,
+                GateSwapSubscribeType::PuFuturesOrderBook,
+                GateSwapSubscribeType::PrFuturesOrders(user_id.clone()),
+                GateSwapSubscribeType::PrFuturesPositions(user_id.clone()),
+                GateSwapSubscribeType::PrFuturesBalances(user_id.clone()),
+            ]);
+        } else { // 参考
+            ws = GateSwapWs::new_label(name.clone(), is_colo, None,
+                                       GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+            ws.set_subscribe(vec![
+                GateSwapSubscribeType::PuFuturesTrades,
+                GateSwapSubscribeType::PuFuturesOrderBook
+            ]);
+        }
+
+        // 读取数据
+        let mut update_flag_u = Decimal::ZERO;
+        let core_arc_clone = Arc::clone(&core_arc);
+        let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().amount_size;
+        let run_symbol = symbols.clone()[0].clone();
+        let fun = move |data| {
+            // 在 async 块之前克隆 Arc
+            let core_arc_clone = core_arc_clone.clone();
+            let mul = multiplier.clone();
+            let rs = run_symbol.clone();
+
+            async move {
+                // 使用克隆后的 Arc,避免 move 语义
+                on_data(core_arc_clone,
+                        &mut update_flag_u,
+                        &mul,
+                        &rs,
+                        data,
+                ).await
+            }
+        };
+
+        // 建立链接
+        ws.set_symbols(symbols_clone);
+        ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+}
+
+async fn on_data(core_arc_clone: Arc<Mutex<Core>>,
+                 update_flag_u: &mut Decimal,
+                 multiplier: &Decimal,
+                 run_symbol: &String,
+                 data: ResponseData) {
+    let mut trace_stack = TraceStack::default();
+    trace_stack.on_after_network(data.time);
+    trace_stack.on_before_unlock_core();
+
+    info!("gate: {}", trace_stack.to_string());
+
+    if data.code != "200".to_string() {
+        return;
+    }
+
+    if data.channel == "futures.order_book" {
+        trace_stack.on_before_format();
+        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data.clone());
+        trace_stack.on_before_network(depth.create_at.clone());
+        trace_stack.on_after_format();
+
+        on_special_depth(core_arc_clone, update_flag_u, data.label, trace_stack, depth).await;
+    } else if data.channel == "futures.balances" {
+        let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
+        {
+            let mut core = core_arc_clone.lock().await;
+            core.update_equity(account).await;
+        }
+    } else if data.channel == "futures.orders" {
+        trace_stack.on_before_format();
+        let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
+        trace_stack.on_after_format();
+
+        let mut order_infos:Vec<OrderInfo> = Vec::new();
+        for order in orders.order {
+            let order_info = OrderInfo {
+                symbol: "".to_string(),
+                amount: order.amount.abs(),
+                side: "".to_string(),
+                price: order.price,
+                client_id: order.custom_id,
+                filled_price: order.avg_price,
+                filled: order.deal_amount.abs(),
+                order_id: order.id,
+                local_time: 0,
+                create_time: 0,
+                status: order.status,
+                fee: Default::default(),
+                trace_stack: Default::default(),
+            };
+            order_infos.push(order_info);
+        }
+
+        {
+            let mut core = core_arc_clone.lock().await;
+            core.update_order(order_infos, trace_stack);
+        }
+    } else if data.channel == "futures.positions" {
+        let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
+        {
+            let mut core = core_arc_clone.lock().await;
+            core.update_position(positions).await;
+        }
+    } else if data.channel == "futures.trades" {
+        // let mut core = core_arc_clone.lock().await;
+        // let str = data.label.clone();
+        // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
+        //     *max_buy = Decimal::ZERO;
+        //     *min_sell = Decimal::ZERO;
+        //     core.is_update.remove(str.as_str());
+        // }
+        // let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
+        // for trade in trades {
+        //     if trade.price > *max_buy || *max_buy == Decimal::ZERO{
+        //         *max_buy = trade.price
+        //     }
+        //     if trade.price < *min_sell || *min_sell == Decimal::ZERO{
+        //         *min_sell = trade.price
+        //     }
+        // }
+        // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+    }
+}
+
+fn parse_btree_map_to_gate_swap_login(exchange_params: BTreeMap<String, String>) -> GateSwapLogin {
+    GateSwapLogin {
+        api_key: exchange_params.get("access_key").unwrap().clone(),
+        secret: exchange_params.get("secret_key").unwrap().clone()
+    }
+}

+ 2 - 1
tests/framework_3_0_test.rs

@@ -2,6 +2,7 @@ use std::future::Future;
 use std::time::Duration;
 use exchanges::response_base::ResponseData;
 use std::sync::Arc;
+use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use tracing::info;
 use global::log_utils::init_log_with_info;
@@ -33,7 +34,7 @@ async fn framework_3_0() {
 // 消息创造者
 async fn generator<F, Fut>(handle_function: F)
 where
-    F: Fn(ResponseData) -> Fut,
+    F: Fn(ResponseData, Decimal, String) -> Fut,
     Fut: Future<Output = ()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
 {
     let data = ResponseData::new("aaa".to_string(),