Bläddra i källkod

coinex的加入。

skyffire 1 år sedan
förälder
incheckning
dab13a0b3a

+ 615 - 615
exchanges/src/coinex_swap_rest.rs

@@ -1,615 +1,615 @@
-// use std::collections::BTreeMap;
-// use std::error::Error;
-// use std::time::{SystemTime, UNIX_EPOCH};
-// use reqwest::header::{HeaderMap, HeaderValue};
-// use hex;
-// use reqwest::Client;
-// use rust_decimal::Decimal;
-// use rust_decimal_macros::dec;
-// use serde_json::Value;
-// use crate::http_tool::RestTool;
-// use crate::response_base::ResponseData;
-// use sha2::{Digest, Sha256};
-// use tracing::{error};
-//
-// #[derive(Clone)]
-// pub struct CoinexSwapRest {
-//     label: String,
-//     base_url: String,
-//     client: Client,
-//     /*******参数*/
-//     //登陆所需参数
-//     login_param: BTreeMap<String, String>,
-//     delays: Vec<i64>,
-//     max_delay: i64,
-//     avg_delay: Decimal,
-// }
-//
-// impl CoinexSwapRest {
-//     /*******************************************************************************************************/
-//     /*****************************************获取一个对象****************************************************/
-//     /*******************************************************************************************************/
-//     pub fn new(login_param: BTreeMap<String, String>) -> CoinexSwapRest
-//     {
-//         return CoinexSwapRest::new_label("default-CoinexSwapRest".to_string(), login_param);
-//     }
-//     pub fn new_label(label: String, login_param: BTreeMap<String, String>) -> CoinexSwapRest
-//     {
-//         let base_url: String = String::from("https://api.coinex.com");
-//
-//         /*****返回结构体*******/
-//         CoinexSwapRest {
-//             label,
-//             base_url,
-//             client: Client::new(),
-//             login_param,
-//             delays: vec![],
-//             max_delay: 0,
-//             avg_delay: dec!(0.0),
-//         }
-//     }
-//
-//     /*******************************************************************************************************/
-//     /*****************************************rest请求函数********************************************************/
-//     /*******************************************************************************************************/
-//     //获取服务器当前时间
-//     pub async fn get_server_time(&mut self) -> ResponseData {
-//
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/time".to_string(),
-//                                 false,
-//                                 None,
-//                                 None
-//         ).await;
-//         data
-//     }
-//     //查询个人交易费率
-//     pub async fn wallet_fee(&mut self) -> ResponseData {
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/account/trade-fee-rate".to_string(),
-//                                 true,
-//                                 None,
-//                                 None
-//         ).await;
-//         data
-//     }
-//     //查询合约账户
-//     pub async fn get_account(&mut self) -> ResponseData {
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/assets/futures/balance".to_string(),
-//                                 true,
-//                                 None,
-//                                 None
-//         ).await;
-//         data
-//     }
-//
-//     //指定币对仓位列表
-//     pub async fn get_position(&mut self, market: String) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market": market,
-//             "market_type": "FUTURES"
-//         });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/pending-position".to_string(),
-//                                 true,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//
-//     //用户仓位列表
-//     pub async fn get_user_position(&mut self) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market_type": "FUTURES"
-//         });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/pending-position".to_string(),
-//                                 true,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//
-//     //获取所有合约交易行情统计 market 市场名列表,多个市场名之间使用英文","分隔,空字符串或不传表示查询全部市场,限制最多10个市场
-//     pub async fn get_ticker(&mut self, market: String) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market": market
-//          });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/ticker".to_string(),
-//                                 false,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//     //查询所有的合约信息
-//     pub async fn get_market_details(&mut self, market: String) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market": market
-//          });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/market".to_string(),
-//                                 false,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//     //查询单个订单详情  /spot/order-status?market=CETUSDT&order_id=13400
-//     pub async fn get_order_details(&mut self, order_id: String, market: String) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market": market,
-//             "order_id": order_id
-//          });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/order-status".to_string(),
-//                                 true,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//     //查询未完成合约订单 /futures/pending-order?market=CETUSDT&market_type=FUTURES&side=buy&page=1&limit=10
-//     pub async fn get_pending_order(&mut self, client_id: String) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market_type": "FUTURES",
-//             "client_id": client_id,
-//             "limit": 10
-//          });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/pending-order".to_string(),
-//                                 true,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//
-//     pub async fn get_pending_orders(&mut self) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market_type": "FUTURES",
-//             "limit": 100
-//          });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/pending-order".to_string(),
-//                                 true,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//
-//     pub async fn get_finished_orders(&mut self) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market_type": "FUTURES",
-//             "limit": 100
-//          });
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/finished-order".to_string(),
-//                                 true,
-//                                 Some(params.to_string()),
-//                                 None
-//         ).await;
-//         data
-//     }
-//
-//     //下单
-//     //  coinex swap 平仓需考虑最小下单量 只能通过close_position和position_id来平仓
-//     pub async fn order(&mut self,
-//                        market: String,
-//                        pos_side: String,
-//                        side: String,
-//                        size: Decimal,
-//                        price: Decimal,
-//                        client_id: String
-//     ) -> ResponseData
-//     {
-//         // 默认为限价单
-//         let mut type_y = "limit".to_string();
-//         // 0为市价单,
-//         if price == Decimal::ZERO {
-//             type_y = "market".to_string();
-//         }
-//         let data;
-//
-//
-//         match format!("{}_{}", pos_side, side).as_str() {
-//             "long_buy" => {//开多
-//                 data = self.swap_order(market, side, type_y, size, price, client_id, false).await;
-//             }
-//             "long_sell" => {//平多
-//                 data = self.close_position(market, type_y,  price, client_id, false).await;
-//             }
-//             "short_buy" => {//平空
-//                 data = self.close_position(market, type_y, price, client_id, false).await;
-//             }
-//             "short_sell" => {//开空
-//                 data = self.swap_order(market, side, type_y, size, price, client_id, false).await;
-//             }
-//             _ => {// 处理未知请求类型
-//                 error!("下单失败,数量异常! size: {}", size);
-//                 data = ResponseData::error(self.label.clone(), format!("下单失败, 下单参数: <market: {:?}, pos_side: {:?}, side: {:?}, size: {}, price: {:?}, client_id: {:?}>", market, pos_side, side, size, price, client_id));
-//             }
-//         };
-//         data
-//     }
-//
-//     // 平仓下单
-//     pub async fn close_position(&mut self, market: String, type_y : String,  price: Decimal, client_id: String, is_hide: bool) -> ResponseData {
-//         // 数量不传为全平
-//         let param = serde_json::json!({
-//             "market":market,
-//             "market_type": "FUTURES",
-//             "type": type_y,
-//             "price":price,
-//             "client_id":client_id,
-//             "is_hide": is_hide
-//         });
-//
-//         let data = self.request("POST".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/close-position".to_string(),
-//                                 true,
-//                                 None,
-//                                 Some(param.to_string())
-//         ).await;
-//         data
-//     }
-//
-//     //合约交易开仓下单
-//     pub async fn swap_order(&mut self, market: String, side: String, type_y : String, amount: Decimal, price: Decimal, client_id: String, is_hide: bool) -> ResponseData {
-//         let param = serde_json::json!({
-//             "market":market,
-//             "market_type": "FUTURES",
-//             "side": side,
-//             "type": type_y,
-//             "amount":amount,
-//             "price":price,
-//             "client_id":client_id,
-//             "is_hide": is_hide
-//         });
-//
-//         let data = self.request("POST".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/order".to_string(),
-//                                 true,
-//                                 None,
-//                                 Some(param.to_string())
-//         ).await;
-//         data
-//     }
-//
-//     //设置持仓模式
-//     pub async fn setting_dual_mode(&mut self) -> ResponseData {
-//         ResponseData::error(self.label.clone(), "设置双向持仓失败, coinex没有设置双向持仓".to_string())
-//     }
-//     //更新双仓模式下的杠杆
-//     pub async fn setting_dual_leverage(&mut self, market: String, leverage: i32) -> ResponseData {
-//         let params = serde_json::json!({
-//                 "market": market,
-//                 "market_type": "FUTURES",
-//                 // cross: 全仓。全仓模式下,合约账户的全部可用余额都可用作当前全部仓位的共享保证金,系统会使用合约账户中的可用余额自动追加保证金,以避免仓位被强平
-//                 //isolated: 逐仓。逐仓模式下,仓位保证金不会共享,单个仓位的保证金仅用于当前仓位,系统不会自动追加保证金,需要手动追加。
-//                 "margin_mode": "cross",
-//                 "leverage":leverage,
-//              });
-//         let data = self.request("POST".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/adjust-position-leverage".to_string(),
-//                                 true,
-//                                 None,
-//                                 Some(params.to_string())
-//         ).await;
-//         data
-//     }
-//
-//     //撤销单个订单
-//     pub async fn cancel_order(&mut self, market: String, order_id: &str, client_id: &str) -> ResponseData {
-//         if order_id != "" {  // 如果真实订单id不为空,则用真实订单id取消订单
-//             let id = order_id.parse::<i64>().unwrap();
-//             let params = serde_json::json!({
-//                 "market": market,
-//                 "market_type": "FUTURES",
-//                 "order_id": id
-//             });
-//             let data = self.request("POST".to_string(),
-//                                     "/v2".to_string(),
-//                                     "/futures/cancel-order".to_string(),
-//                                     true,
-//                                     None,
-//                                     Some(params.to_string())
-//             ).await;
-//             data
-//         } else if client_id != "" {  // 如果客户端id不为空,则用客户端id取消订单
-//             let params = serde_json::json!({
-//                 "market": market,
-//                 "market_type": "FUTURES",
-//                 "client_id": client_id
-//             });
-//
-//             let mut data = self.request("POST".to_string(),
-//                                     "/v2".to_string(),
-//                                     "/futures/cancel-order-by-client-id".to_string(),
-//                                     true,
-//                                     None,
-//                                     Some(params.to_string())
-//             ).await;
-//             // 非空的
-//             if data.code == 200 && !data.data.is_null() {
-//                 data.data = data.data.as_array().unwrap()[0]["data"].clone();
-//             }
-//             data
-//         } else {  // 否则返回错误
-//             error!("取消订单失败失败,id异常");
-//             ResponseData::error(self.label.clone(), format!("取消订单失败失败, orderId:{:?}, clientId: {:?} ", order_id, client_id))
-//         }
-//     }
-//
-//     // 撤销所有挂单
-//     pub async fn cancel_order_all(&mut self, market: String) -> ResponseData {
-//         let params = serde_json::json!({
-//             "market": market,
-//             "market_type": "FUTURES"
-//         });
-//         let data = self.request("POST".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/cancel-all-order".to_string(),
-//                                 true,
-//                                 None,
-//                                 Some(params.to_string())
-//         ).await;
-//         data
-//     }
-//
-//     //查询个人成交记录
-//     pub async fn my_trades(&mut self, market: String, limit: i64) -> ResponseData {
-//         let mut params = serde_json::json!({
-//             "market": market,
-//             "market_type": "FUTURES",
-//             "limit": 1000
-//         });
-//         if limit > 0 {
-//             params["limit"] = serde_json::json!(limit);
-//         }
-//
-//         let data = self.request("GET".to_string(),
-//                                 "/v2".to_string(),
-//                                 "/futures/user-deals".to_string(),
-//                                 true,
-//                                 Some(params.to_string()), None).await;
-//         data
-//     }
-//
-//     //查询合约账户变更历史
-//     pub async fn account_book(&mut self) -> ResponseData {
-//         error!("查询合约账户变更历史失败,无实现");
-//         ResponseData::error(self.label.clone(), "查询合约账户变更历史失败,接口没实现".to_string())
-//     }
-//
-//
-//     /*******************************************************************************************************/
-//     /*****************************************工具函数********************************************************/
-//     /*******************************************************************************************************/
-//     pub fn get_delays(&self) -> Vec<i64> {
-//         self.delays.clone()
-//     }
-//     pub fn get_avg_delay(&self) -> Decimal {
-//         self.avg_delay.clone()
-//     }
-//     pub fn get_max_delay(&self) -> i64 {
-//         self.max_delay.clone()
-//     }
-//     // fn get_delay_info(&mut self) {
-//     //     let last_100 = if self.delays.len() > 100 {
-//     //         self.delays[self.delays.len() - 100..].to_vec()
-//     //     } else {
-//     //         self.delays.clone()
-//     //     };
-//     //
-//     //     let max_value = last_100.iter().max().unwrap();
-//     //     if max_value.clone().to_owned() > self.max_delay {
-//     //         self.max_delay = max_value.clone().to_owned();
-//     //     }
-//     //
-//     //     let sum: i64 = last_100.iter().sum();
-//     //     let sum_v = Decimal::from_i64(sum).unwrap();
-//     //     let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
-//     //     self.avg_delay = (sum_v / len_v).round_dp(1);
-//     //     self.delays = last_100.clone().into_iter().collect();
-//     // }
-//
-//     //调用请求
-//     async fn request(&mut self,
-//                      request_type: String,
-//                      prefix_url: String,
-//                      request_url: String,
-//                      is_login: bool,
-//                      params: Option<String>,
-//                      body: Option<String>) -> ResponseData
-//     {
-//         // trace!("login_param:{:?}", self.login_param);
-//         //解析账号信息
-//         let mut access_key = "".to_string();
-//         let mut secret_key = "".to_string();
-//         if self.login_param.contains_key("access_key") {
-//             access_key = self.login_param.get("access_key").unwrap().to_string();
-//         }
-//         if self.login_param.contains_key("secret_key") {
-//             secret_key = self.login_param.get("secret_key").unwrap().to_string();
-//         }
-//         let mut is_login_param = true;
-//         if access_key == "" || secret_key == "" {
-//             is_login_param = false
-//         }
-//
-//         let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis().to_string();
-//         // url
-//         let mut url_and_query = format!("{}{}", prefix_url.clone(), request_url.clone());
-//         let mut headers = HeaderMap::new();
-//         headers.insert("Content-Type", HeaderValue::from_static("application/json"));
-//         headers.insert(
-//             "X-COINEX-KEY",
-//             HeaderValue::from_str(&self.login_param.get("access_key").unwrap()).unwrap(),
-//         );
-//         headers.insert(
-//             "X-COINEX-TIMESTAMP",
-//             HeaderValue::from_str(&timestamp).unwrap(),
-//         );
-//
-//         if let Some(params) = params {
-//             let query = RestTool::parse_params_to_str(params);
-//             url_and_query = format!("{}?{}", url_and_query, query);
-//         }
-//         let body_s = if let Some(body) = body {
-//             body
-//         } else {
-//             "".to_string()
-//         };
-//
-//         //是否需要登陆-- 组装sing
-//         if is_login {
-//             if !is_login_param {
-//                 let e = ResponseData::error(self.label.clone(), "登陆参数错误!".to_string());
-//                 return e;
-//             } else {//需要登陆-且登陆参数齐全
-//                 //组装sing
-//                 let sing = Self::sign(
-//                     &request_type,
-//                     &url_and_query,
-//                     &body_s,
-//                     timestamp.clone(),
-//                     &secret_key
-//                 );
-//                 // trace!("sing:{}", sing);
-//                 //组装header
-//                 headers.insert("X-COINEX-SIGN", HeaderValue::from_str(&sing.unwrap()).unwrap());
-//             }
-//         }
-//
-//         let start_time = chrono::Utc::now().timestamp_millis();
-//         let response = self.http_toll(
-//             url_and_query,
-//             request_type,
-//             body_s.clone(),
-//             headers,
-//         ).await;
-//
-//         let time_array = chrono::Utc::now().timestamp_millis() - start_time;
-//         self.delays.push(time_array);
-//         // self.get_delay_info();
-//
-//         response
-//     }
-//     fn sign(
-//         method: &String,
-//         path: &String,
-//         body: &String,
-//         timestamp: String,
-//         secret_key: &String
-//     ) -> Result<String, Box<dyn Error>> {
-//         let prepared_str = format!(
-//             "{}{}{}{}{}",
-//             method, path, body, timestamp, secret_key
-//         );
-//         let hash = Sha256::digest(prepared_str.as_bytes());
-//         Ok(hex::encode(hash))
-//     }
-//
-//     async fn http_toll(&mut self, request_path: String, request_type: String, body: String, headers: HeaderMap) -> ResponseData {
-//         /****请求接口与 地址*/
-//         let url = format!("{}{}", self.base_url.to_string(), request_path);
-//         let request_type = request_type.clone().to_uppercase();
-//
-//
-//         let request_builder = match request_type.as_str() {
-//             "GET" => self.client.get(&url).headers(headers),
-//             "POST" => self.client.post(&url).body(body.clone()).headers(headers),
-//             "DELETE" => self.client.delete(&url).headers(headers),
-//             // "PUT" => self.client.put(url.clone()).json(&params),
-//             _ => {
-//                 panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
-//             }
-//         };
-//
-//         // 读取响应的内容
-//         let res = request_builder.send().await;
-//         match res {
-//             Ok(response) => {
-//                 let is_success = response.status().is_success(); // 先检查状态码
-//                 let text_result = response.text().await;
-//                 match text_result {
-//                     Ok(text) => {
-//                         let data_json_str: Result<Value, serde_json::Error> = serde_json::from_str(text.as_str());
-//                         match data_json_str {
-//                             Ok(data_json) => {
-//                                 return if is_success && data_json["code"].to_string() == "0"{
-//                                     self.on_success_data(data_json["data"].clone())
-//                                 } else {
-//                                     self.on_error_data(&text, &url, &body)
-//                                 }
-//                             },
-//                             Err(e) => {
-//                                 error!("{} 请求完成,解析响应内容JSON失败 {} {}", url, text.as_str(), e);
-//                                 self.on_error_data(&e.to_string(), &url, &body)
-//                             }
-//                         }
-//
-//                     },
-//                     Err(e) => {
-//                         error!("{} 请求完成,解析响应内容失败 {}", url, e);
-//                         self.on_error_data(&e.to_string(), &url, &body)
-//                     }
-//                 }
-//             },
-//             Err(e) => {// 异常情况
-//                 error!("{} 请求失败,网络错误 {}", url, e);
-//                 self.on_error_data(&e.to_string(), &url, &body)
-//             }
-//         }
-//     }
-//
-//     pub fn on_success_data(&mut self, text: Value) -> ResponseData {
-//         ResponseData::new(self.label.clone(),
-//                           200,
-//                           "success".to_string(),
-//                           text)
-//     }
-//
-//     pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
-//         let json_value = serde_json::from_str::<Value>(&text);
-//
-//         match json_value {
-//             Ok(data) => {
-//                 let message;
-//                 if !data["message"].is_null() {
-//                     message = format!("{}:{}", data["code"].to_string(), data["message"].as_str().unwrap());
-//                 } else {
-//                     message = data["code"].to_string();
-//                 }
-//                 let mut error = ResponseData::error(self.label.clone(), message);
-//                 error.message = format!("请求地址:{}, 请求参数:{}, 报错内容:{}。", base_url, params, error.message);
-//                 error
-//             }
-//             Err(e) => {
-//                 error!("解析错误:{:?}", e);
-//                 let error = ResponseData::error("".to_string(),format!("json 解析失败:{},相关参数:{}", e, text));
-//                 error
-//             }
-//         }
-//     }
-// }
+use std::collections::BTreeMap;
+use std::error::Error;
+use std::time::{SystemTime, UNIX_EPOCH};
+use reqwest::header::{HeaderMap, HeaderValue};
+use hex;
+use reqwest::Client;
+use rust_decimal::Decimal;
+use rust_decimal_macros::dec;
+use serde_json::Value;
+use crate::http_tool::RestTool;
+use crate::response_base::ResponseData;
+use sha2::{Digest, Sha256};
+use tracing::{error};
+
+#[derive(Clone)]
+pub struct CoinexSwapRest {
+    tag: String,
+    base_url: String,
+    client: Client,
+    /*******参数*/
+    //登陆所需参数
+    login_param: BTreeMap<String, String>,
+    delays: Vec<i64>,
+    max_delay: i64,
+    avg_delay: Decimal,
+}
+
+impl CoinexSwapRest {
+    /*******************************************************************************************************/
+    /*****************************************获取一个对象****************************************************/
+    /*******************************************************************************************************/
+    pub fn new(login_param: BTreeMap<String, String>) -> CoinexSwapRest
+    {
+        return CoinexSwapRest::new_with_tag("default-CoinexSwapRest".to_string(), login_param);
+    }
+    pub fn new_with_tag(tag: String, login_param: BTreeMap<String, String>) -> CoinexSwapRest
+    {
+        let base_url: String = String::from("https://api.coinex.com");
+
+        /*****返回结构体*******/
+        CoinexSwapRest {
+            tag,
+            base_url,
+            client: Client::new(),
+            login_param,
+            delays: vec![],
+            max_delay: 0,
+            avg_delay: dec!(0.0),
+        }
+    }
+
+    /*******************************************************************************************************/
+    /*****************************************rest请求函数********************************************************/
+    /*******************************************************************************************************/
+    //获取服务器当前时间
+    pub async fn get_server_time(&mut self) -> ResponseData {
+
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/time".to_string(),
+                                false,
+                                None,
+                                None
+        ).await;
+        data
+    }
+    //查询个人交易费率
+    pub async fn wallet_fee(&mut self) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/account/trade-fee-rate".to_string(),
+                                true,
+                                None,
+                                None
+        ).await;
+        data
+    }
+    //查询合约账户
+    pub async fn get_account(&mut self) -> ResponseData {
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/assets/futures/balance".to_string(),
+                                true,
+                                None,
+                                None
+        ).await;
+        data
+    }
+
+    //指定币对仓位列表
+    pub async fn get_position(&mut self, market: String) -> ResponseData {
+        let params = serde_json::json!({
+            "market": market,
+            "market_type": "FUTURES"
+        });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/pending-position".to_string(),
+                                true,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+
+    //用户仓位列表
+    pub async fn get_user_position(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "market_type": "FUTURES"
+        });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/pending-position".to_string(),
+                                true,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+
+    //获取所有合约交易行情统计 market 市场名列表,多个市场名之间使用英文","分隔,空字符串或不传表示查询全部市场,限制最多10个市场
+    pub async fn get_ticker(&mut self, market: String) -> ResponseData {
+        let params = serde_json::json!({
+            "market": market
+         });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/ticker".to_string(),
+                                false,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+    //查询所有的合约信息
+    pub async fn get_market_details(&mut self, market: String) -> ResponseData {
+        let params = serde_json::json!({
+            "market": market
+        });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/market".to_string(),
+                                false,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+    //查询单个订单详情  /spot/order-status?market=CETUSDT&order_id=13400
+    pub async fn get_order_details(&mut self, order_id: String, market: String) -> ResponseData {
+        let params = serde_json::json!({
+            "market": market,
+            "order_id": order_id
+         });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/order-status".to_string(),
+                                true,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+    //查询未完成合约订单 /futures/pending-order?market=CETUSDT&market_type=FUTURES&side=buy&page=1&limit=10
+    pub async fn get_pending_order(&mut self, client_id: String) -> ResponseData {
+        let params = serde_json::json!({
+            "market_type": "FUTURES",
+            "client_id": client_id,
+            "limit": 10
+         });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/pending-order".to_string(),
+                                true,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+
+    pub async fn get_pending_orders(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "market_type": "FUTURES",
+            "limit": 100
+         });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/pending-order".to_string(),
+                                true,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+
+    pub async fn get_finished_orders(&mut self) -> ResponseData {
+        let params = serde_json::json!({
+            "market_type": "FUTURES",
+            "limit": 100
+         });
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/finished-order".to_string(),
+                                true,
+                                Some(params.to_string()),
+                                None
+        ).await;
+        data
+    }
+
+    //下单
+    //  coinex swap 平仓需考虑最小下单量 只能通过close_position和position_id来平仓
+    pub async fn order(&mut self,
+                       market: String,
+                       pos_side: String,
+                       side: String,
+                       size: Decimal,
+                       price: Decimal,
+                       client_id: String
+    ) -> ResponseData
+    {
+        // 默认为限价单
+        let mut type_y = "limit".to_string();
+        // 0为市价单,
+        if price == Decimal::ZERO {
+            type_y = "market".to_string();
+        }
+        let data;
+
+
+        match format!("{}_{}", pos_side, side).as_str() {
+            "long_buy" => {//开多
+                data = self.swap_order(market, side, type_y, size, price, client_id, false).await;
+            }
+            "long_sell" => {//平多
+                data = self.close_position(market, type_y,  price, client_id, false).await;
+            }
+            "short_buy" => {//平空
+                data = self.close_position(market, type_y, price, client_id, false).await;
+            }
+            "short_sell" => {//开空
+                data = self.swap_order(market, side, type_y, size, price, client_id, false).await;
+            }
+            _ => {// 处理未知请求类型
+                error!("下单失败,数量异常! size: {}", size);
+                data = ResponseData::error(self.tag.clone(), format!("下单失败, 下单参数: <market: {:?}, pos_side: {:?}, side: {:?}, size: {}, price: {:?}, client_id: {:?}>", market, pos_side, side, size, price, client_id));
+            }
+        };
+        data
+    }
+
+    // 平仓下单
+    pub async fn close_position(&mut self, market: String, type_y : String,  price: Decimal, client_id: String, is_hide: bool) -> ResponseData {
+        // 数量不传为全平
+        let param = serde_json::json!({
+            "market":market,
+            "market_type": "FUTURES",
+            "type": type_y,
+            "price":price,
+            "client_id":client_id,
+            "is_hide": is_hide
+        });
+
+        let data = self.request("POST".to_string(),
+                                "/v2".to_string(),
+                                "/futures/close-position".to_string(),
+                                true,
+                                None,
+                                Some(param.to_string())
+        ).await;
+        data
+    }
+
+    //合约交易开仓下单
+    pub async fn swap_order(&mut self, market: String, side: String, type_y : String, amount: Decimal, price: Decimal, client_id: String, is_hide: bool) -> ResponseData {
+        let param = serde_json::json!({
+            "market":market,
+            "market_type": "FUTURES",
+            "side": side,
+            "type": type_y,
+            "amount":amount,
+            "price":price,
+            "client_id":client_id,
+            "is_hide": is_hide
+        });
+
+        let data = self.request("POST".to_string(),
+                                "/v2".to_string(),
+                                "/futures/order".to_string(),
+                                true,
+                                None,
+                                Some(param.to_string())
+        ).await;
+        data
+    }
+
+    //设置持仓模式
+    pub async fn setting_dual_mode(&mut self) -> ResponseData {
+        ResponseData::error(self.tag.clone(), "设置双向持仓失败, coinex没有设置双向持仓".to_string())
+    }
+    //更新双仓模式下的杠杆
+    pub async fn setting_dual_leverage(&mut self, market: String, leverage: i32) -> ResponseData {
+        let params = serde_json::json!({
+                "market": market,
+                "market_type": "FUTURES",
+                // cross: 全仓。全仓模式下,合约账户的全部可用余额都可用作当前全部仓位的共享保证金,系统会使用合约账户中的可用余额自动追加保证金,以避免仓位被强平
+                //isolated: 逐仓。逐仓模式下,仓位保证金不会共享,单个仓位的保证金仅用于当前仓位,系统不会自动追加保证金,需要手动追加。
+                "margin_mode": "cross",
+                "leverage":leverage,
+             });
+        let data = self.request("POST".to_string(),
+                                "/v2".to_string(),
+                                "/futures/adjust-position-leverage".to_string(),
+                                true,
+                                None,
+                                Some(params.to_string())
+        ).await;
+        data
+    }
+
+    //撤销单个订单
+    pub async fn cancel_order(&mut self, market: String, order_id: &str, client_id: &str) -> ResponseData {
+        if order_id != "" {  // 如果真实订单id不为空,则用真实订单id取消订单
+            let id = order_id.parse::<i64>().unwrap();
+            let params = serde_json::json!({
+                "market": market,
+                "market_type": "FUTURES",
+                "order_id": id
+            });
+            let data = self.request("POST".to_string(),
+                                    "/v2".to_string(),
+                                    "/futures/cancel-order".to_string(),
+                                    true,
+                                    None,
+                                    Some(params.to_string())
+            ).await;
+            data
+        } else if client_id != "" {  // 如果客户端id不为空,则用客户端id取消订单
+            let params = serde_json::json!({
+                "market": market,
+                "market_type": "FUTURES",
+                "client_id": client_id
+            });
+
+            let mut data = self.request("POST".to_string(),
+                                    "/v2".to_string(),
+                                    "/futures/cancel-order-by-client-id".to_string(),
+                                    true,
+                                    None,
+                                    Some(params.to_string())
+            ).await;
+            // 非空的
+            if data.code == 200 && !data.data.is_null() {
+                data.data = data.data.as_array().unwrap()[0]["data"].clone();
+            }
+            data
+        } else {  // 否则返回错误
+            error!("取消订单失败失败,id异常");
+            ResponseData::error(self.tag.clone(), format!("取消订单失败失败, orderId:{:?}, clientId: {:?} ", order_id, client_id))
+        }
+    }
+
+    // 撤销所有挂单
+    pub async fn cancel_order_all(&mut self, market: String) -> ResponseData {
+        let params = serde_json::json!({
+            "market": market,
+            "market_type": "FUTURES"
+        });
+        let data = self.request("POST".to_string(),
+                                "/v2".to_string(),
+                                "/futures/cancel-all-order".to_string(),
+                                true,
+                                None,
+                                Some(params.to_string())
+        ).await;
+        data
+    }
+
+    //查询个人成交记录
+    pub async fn my_trades(&mut self, market: String, limit: i64) -> ResponseData {
+        let mut params = serde_json::json!({
+            "market": market,
+            "market_type": "FUTURES",
+            "limit": 1000
+        });
+        if limit > 0 {
+            params["limit"] = serde_json::json!(limit);
+        }
+
+        let data = self.request("GET".to_string(),
+                                "/v2".to_string(),
+                                "/futures/user-deals".to_string(),
+                                true,
+                                Some(params.to_string()), None).await;
+        data
+    }
+
+    //查询合约账户变更历史
+    pub async fn account_book(&mut self) -> ResponseData {
+        error!("查询合约账户变更历史失败,无实现");
+        ResponseData::error(self.tag.clone(), "查询合约账户变更历史失败,接口没实现".to_string())
+    }
+
+
+    /*******************************************************************************************************/
+    /*****************************************工具函数********************************************************/
+    /*******************************************************************************************************/
+    pub fn get_delays(&self) -> Vec<i64> {
+        self.delays.clone()
+    }
+    pub fn get_avg_delay(&self) -> Decimal {
+        self.avg_delay.clone()
+    }
+    pub fn get_max_delay(&self) -> i64 {
+        self.max_delay.clone()
+    }
+    // fn get_delay_info(&mut self) {
+    //     let last_100 = if self.delays.len() > 100 {
+    //         self.delays[self.delays.len() - 100..].to_vec()
+    //     } else {
+    //         self.delays.clone()
+    //     };
+    //
+    //     let max_value = last_100.iter().max().unwrap();
+    //     if max_value.clone().to_owned() > self.max_delay {
+    //         self.max_delay = max_value.clone().to_owned();
+    //     }
+    //
+    //     let sum: i64 = last_100.iter().sum();
+    //     let sum_v = Decimal::from_i64(sum).unwrap();
+    //     let len_v = Decimal::from_u64(last_100.len() as u64).unwrap();
+    //     self.avg_delay = (sum_v / len_v).round_dp(1);
+    //     self.delays = last_100.clone().into_iter().collect();
+    // }
+
+    //调用请求
+    async fn request(&mut self,
+                     request_type: String,
+                     prefix_url: String,
+                     request_url: String,
+                     is_login: bool,
+                     params: Option<String>,
+                     body: Option<String>) -> ResponseData
+    {
+        let mut url_and_query = format!("{}{}", prefix_url.clone(), request_url.clone());
+        let body_s = if let Some(body) = body {
+            body
+        } else {
+            "".to_string()
+        };
+        let mut headers = HeaderMap::new();
+
+        //是否需要登陆-- 组装sing
+        if is_login {
+            // trace!("login_param:{:?}", self.login_param);
+            //解析账号信息
+            let mut access_key = "".to_string();
+            let mut secret_key = "".to_string();
+            if self.login_param.contains_key("access_key") {
+                access_key = self.login_param.get("access_key").unwrap().to_string();
+            }
+            if self.login_param.contains_key("secret_key") {
+                secret_key = self.login_param.get("secret_key").unwrap().to_string();
+            }
+            let mut is_login_param = true;
+            if access_key == "" || secret_key == "" {
+                is_login_param = false
+            }
+
+            let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis().to_string();
+            // url
+            headers.insert("Content-Type", HeaderValue::from_static("application/json"));
+            headers.insert(
+                "X-COINEX-KEY",
+                HeaderValue::from_str(&self.login_param.get("access_key").unwrap()).unwrap(),
+            );
+            headers.insert(
+                "X-COINEX-TIMESTAMP",
+                HeaderValue::from_str(&timestamp).unwrap(),
+            );
+
+            if let Some(params) = params {
+                let query = RestTool::parse_params_to_str(params);
+                url_and_query = format!("{}?{}", url_and_query, query);
+            }
+            if !is_login_param {
+                let e = ResponseData::error(self.tag.clone(), "登陆参数错误!".to_string());
+                return e;
+            } else {//需要登陆-且登陆参数齐全
+                //组装sing
+                let sing = Self::sign(
+                    &request_type,
+                    &url_and_query,
+                    &body_s,
+                    timestamp.clone(),
+                    &secret_key
+                );
+                // trace!("sing:{}", sing);
+                //组装header
+                headers.insert("X-COINEX-SIGN", HeaderValue::from_str(&sing.unwrap()).unwrap());
+            }
+        }
+
+        let start_time = chrono::Utc::now().timestamp_millis();
+        let response = self.http_toll(
+            url_and_query,
+            request_type,
+            body_s.clone(),
+            headers,
+        ).await;
+
+        let time_array = chrono::Utc::now().timestamp_millis() - start_time;
+        self.delays.push(time_array);
+        // self.get_delay_info();
+
+        response
+    }
+    fn sign(
+        method: &String,
+        path: &String,
+        body: &String,
+        timestamp: String,
+        secret_key: &String
+    ) -> Result<String, Box<dyn Error>> {
+        let prepared_str = format!(
+            "{}{}{}{}{}",
+            method, path, body, timestamp, secret_key
+        );
+        let hash = Sha256::digest(prepared_str.as_bytes());
+        Ok(hex::encode(hash))
+    }
+
+    async fn http_toll(&mut self, request_path: String, request_type: String, body: String, headers: HeaderMap) -> ResponseData {
+        /****请求接口与 地址*/
+        let url = format!("{}{}", self.base_url.to_string(), request_path);
+        let request_type = request_type.clone().to_uppercase();
+
+
+        let request_builder = match request_type.as_str() {
+            "GET" => self.client.get(&url).headers(headers),
+            "POST" => self.client.post(&url).body(body.clone()).headers(headers),
+            "DELETE" => self.client.delete(&url).headers(headers),
+            // "PUT" => self.client.put(url.clone()).json(&params),
+            _ => {
+                panic!("{}", format!("错误的请求类型:{}", request_type.clone()))
+            }
+        };
+
+        // 读取响应的内容
+        let res = request_builder.send().await;
+        match res {
+            Ok(response) => {
+                let is_success = response.status().is_success(); // 先检查状态码
+                let text_result = response.text().await;
+                match text_result {
+                    Ok(text) => {
+                        let data_json_str: Result<Value, serde_json::Error> = serde_json::from_str(text.as_str());
+                        match data_json_str {
+                            Ok(data_json) => {
+                                return if is_success && data_json["code"].to_string() == "0"{
+                                    self.on_success_data(data_json["data"].clone())
+                                } else {
+                                    self.on_error_data(&text, &url, &body)
+                                }
+                            },
+                            Err(e) => {
+                                error!("{} 请求完成,解析响应内容JSON失败 {} {}", url, text.as_str(), e);
+                                self.on_error_data(&e.to_string(), &url, &body)
+                            }
+                        }
+
+                    },
+                    Err(e) => {
+                        error!("{} 请求完成,解析响应内容失败 {}", url, e);
+                        self.on_error_data(&e.to_string(), &url, &body)
+                    }
+                }
+            },
+            Err(e) => {// 异常情况
+                error!("{} 请求失败,网络错误 {}", url, e);
+                self.on_error_data(&e.to_string(), &url, &body)
+            }
+        }
+    }
+
+    pub fn on_success_data(&mut self, text: Value) -> ResponseData {
+        ResponseData::new(self.tag.clone(),
+                          200,
+                          "success".to_string(),
+                          text)
+    }
+
+    pub fn on_error_data(&mut self, text: &String, base_url: &String, params: &String) -> ResponseData {
+        let json_value = serde_json::from_str::<Value>(&text);
+
+        match json_value {
+            Ok(data) => {
+                let message;
+                if !data["message"].is_null() {
+                    message = format!("{}:{}", data["code"].to_string(), data["message"].as_str().unwrap());
+                } else {
+                    message = data["code"].to_string();
+                }
+                let mut error = ResponseData::error(self.tag.clone(), message);
+                error.message = format!("请求地址:{}, 请求参数:{}, 报错内容:{}。", base_url, params, error.message);
+                error
+            }
+            Err(e) => {
+                error!("解析错误:{:?}", e);
+                let error = ResponseData::error("".to_string(),format!("json 解析失败:{},相关参数:{}", e, text));
+                error
+            }
+        }
+    }
+}

+ 26 - 24
exchanges/src/coinex_swap_ws.rs

@@ -51,7 +51,7 @@ pub struct CoinexSwapLogin {
 #[derive(Clone)]
 pub struct CoinexSwapWs {
     //类型
-    label: String,
+    tag: String,
     //地址
     address_url: String,
     //账号信息
@@ -61,7 +61,7 @@ pub struct CoinexSwapWs {
     //订阅
     subscribe_types: Vec<CoinexSwapSubscribeType>,
     //心跳间隔
-    heartbeat_time: u64
+    heartbeat_time: u64,
 }
 
 
@@ -70,21 +70,21 @@ impl CoinexSwapWs {
     /*****************************************实例化一个对象****************************************************/
     /*******************************************************************************************************/
     pub fn new(login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs {
-        return CoinexSwapWs::new_label("default-CoinexSwapWs".to_string(), login_param);
+        return CoinexSwapWs::new_with_tag("default-CoinexSwapWs".to_string(), login_param);
     }
 
-    pub fn new_label(label: String, login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs
+    pub fn new_with_tag(tag: String, login_param: Option<CoinexSwapLogin>) -> CoinexSwapWs
     {
         /*******公共频道-私有频道数据组装*/
         let address_url = "wss://socket.coinex.com/v2/futures".to_string();
         info!("走普通通道(不支持colo通道):{}", address_url);
         CoinexSwapWs {
-            label,
+            tag,
             address_url,
             login_param,
             symbol_s: vec![],
             subscribe_types: vec![],
-            heartbeat_time: 1000 * 10
+            heartbeat_time: 1000 * 10,
         }
     }
 
@@ -187,14 +187,16 @@ impl CoinexSwapWs {
     pub fn get_subscription(&self) -> Vec<Value> {
         let mut args = vec![];
         // 只获取第一个
-        let symbol = self.symbol_s.get(0).unwrap().replace("_", "").to_uppercase();
-
-        for subscribe_type in &self.subscribe_types {
-            let ty_str = Self::enum_to_string(symbol.clone(),
-                                              subscribe_type.clone(),
-                                              self.login_param.clone(),
-            );
-            args.push(ty_str);
+        for symbol in &self.symbol_s {
+            let symbol_final = symbol.replace("_", "").to_uppercase();
+
+            for subscribe_type in &self.subscribe_types {
+                let ty_str = Self::enum_to_string(symbol_final.clone(),
+                                                  subscribe_type.clone(),
+                                                  self.login_param.clone(),
+                );
+                args.push(ty_str);
+            }
         }
         args
     }
@@ -216,7 +218,7 @@ impl CoinexSwapWs {
         let login_param_clone = self.login_param.clone();
         let subscription = self.get_subscription();
         let address_url = self.address_url.clone();
-        let label = self.label.clone();
+        let tag = self.tag.clone();
         let heartbeat_time = self.heartbeat_time.clone();
 
 
@@ -284,7 +286,7 @@ impl CoinexSwapWs {
                 }
 
                 AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
-                                                 login_is, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
+                                                 login_is, tag.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
                                                  Self::message_text_sync, Self::message_ping, Self::message_pong, Self::message_binary_sync).await;
                 let mut login_data = LOGIN_DATA.lock().await;
                 // 断联后 设置为没有登录
@@ -343,34 +345,34 @@ impl CoinexSwapWs {
 
         let obj = json_value["method"].as_str();
         match obj {
-            Some(v)=> {
+            Some(v) => {
                 res_data.channel = format!("{}", v);
                 res_data.code = 200;
                 res_data.data = json_value["data"].clone();
-            },
+            }
             None => {
                 // 认证的响应没有method,只能通过id和code判断
                 match json_value["id"].as_i64() {
                     Some(1) => {
                         match json_value["code"].as_i64() {
-                            Some(0) =>{
+                            Some(0) => {
                                 match json_value["data"].as_str() {
                                     None => {
                                         // 登录成功逻辑处理
                                         let mut login_data = LOGIN_DATA.lock().await;
                                         if login_data.0 { // 需要登录
-                                            if !login_data.1{
+                                            if !login_data.1 {
                                                 login_data.1 = true;
                                                 res_data.channel = "server.sign".to_string();
                                                 res_data.code = -200;
-                                            }else {
+                                            } else {
                                                 res_data.code = 400;
                                             }
-                                        }  else { // 不需要登录
+                                        } else { // 不需要登录
                                             res_data.code = 200;
                                         }
                                     }
-                                    _ =>{
+                                    _ => {
                                         res_data.code = 400;
                                     }
                                 }
@@ -390,7 +392,7 @@ impl CoinexSwapWs {
         res_data
     }
 
-    fn parse_zip_data(p0: Vec<u8>) -> String{
+    fn parse_zip_data(p0: Vec<u8>) -> String {
         // 创建一个GzDecoder的实例,将压缩数据作为输入
         let mut decoder = GzDecoder::new(&p0[..]);
 

+ 2 - 2
exchanges/src/socket_tool.rs

@@ -164,7 +164,7 @@ impl AbstractWsMode {
                                                            handle_function: F,
                                                            address_url: String,
                                                            is_first_login: bool,
-                                                           label: String,
+                                                           tag: String,
                                                            subscribe_array: Vec<String>,
                                                            write_to_socket_rx_arc: Arc<Mutex<UnboundedReceiver<Message>>>,
                                                            message_text: T,
@@ -197,7 +197,7 @@ impl AbstractWsMode {
 
                 Self::ws_connected(write_to_socket_rx_arc,
                                    is_first_login,
-                                   label,
+                                   tag,
                                    is_shutdown_arc,
                                    &handle_function,
                                    subscribe_array.clone(),

+ 151 - 0
src/coinex_usdt_swap_data_listener.rs

@@ -0,0 +1,151 @@
+use std::cmp::{max, min};
+use std::collections::{BTreeMap, HashMap};
+use std::str::FromStr;
+use std::sync::{Arc};
+use std::sync::atomic::AtomicBool;
+use lazy_static::lazy_static;
+use rust_decimal::Decimal;
+use tokio::sync::{Mutex, MutexGuard};
+use tracing::info;
+use exchanges::coinex_swap_rest::CoinexSwapRest;
+use exchanges::coinex_swap_ws::{CoinexSwapSubscribeType, CoinexSwapWs};
+use exchanges::response_base::ResponseData;
+use standard::{Record, SpecialTrade};
+use standard::exchange::ExchangeEnum;
+use standard::exchange_struct_handler::ExchangeStructHandler;
+use crate::listener_tools::{update_record, update_trade};
+
+// type DepthMap = HashMap<String, Vec<SpecialDepth>>;
+pub type TradeMap = HashMap<String, Vec<SpecialTrade>>;
+pub type RecordMap = HashMap<String, Record>;
+const EXCHANGE_NAME: &str = "coinex_usdt_swap";
+
+lazy_static! {
+    // static ref DEPTH_MAP: Mutex<DepthMap> = Mutex::new(HashMap::new());
+    static ref TRADES_MAP: Mutex<TradeMap> = Mutex::new(HashMap::new());
+    static ref RECORD_MAP: Mutex<RecordMap> = Mutex::new(HashMap::new());
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>) {
+    let name = "coinex_usdt_swap_listener";
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut coinex_rest = CoinexSwapRest::new(login);
+    let response = coinex_rest.get_market_details("usdt".to_string()).await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let data = response.data.as_array().unwrap();
+        for info in data {
+            symbols.push(info["market"].as_str().unwrap().to_string().replace("USDT", "_USDT"))
+        }
+    }
+    info!(?symbols);
+
+    for chunk in symbols.chunks(20) {
+        let ws_name = name.to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+
+        tokio::spawn(async move {
+            let mut ws = CoinexSwapWs::new_with_tag(ws_name, None);
+            ws.set_subscribe(vec![
+                CoinexSwapSubscribeType::PuFuturesDeals,
+                // GateSwapSubscribeType::PuFuturesOrderBook
+            ]);
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, data_listener, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+}
+
+// 读取数据
+pub async fn data_listener(response: ResponseData) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 深度数据
+        "深度" => {
+            // let depth = ExchangeStructHandler::order_book_handle(ExchangeEnum::GateSwap, &response);
+            //
+            // update_depth(&depth).await;
+        },
+        // 订单流数据
+        "deals.update" => {
+            let trades = ExchangeStructHandler::trades_handle(ExchangeEnum::CoinexSwap, &response);
+
+            for trade in trades.iter() {
+                let trades_map = TRADES_MAP.lock().await;
+                // k线数据更新
+                let record = parse_trades_to_record(trade.symbol.clone(), trades_map);
+                let record_map= RECORD_MAP.lock().await;
+                update_record(&record, record_map, EXCHANGE_NAME).await;
+
+                // 订单流数据更新
+                let trades_map_1 = TRADES_MAP.lock().await;
+                update_trade(trade, trades_map_1, EXCHANGE_NAME).await;
+            }
+        },
+        // pong消息不处理
+        "" => {},
+        _ => {
+            info!("85 未知的数据类型: {:?}", response)
+        }
+    }
+}
+
+pub fn parse_trades_to_record(symbol: String, mut trades_map: MutexGuard<'_, TradeMap>) -> Record {
+    if let Some(trades) = trades_map.get_mut(symbol.as_str()) {
+        let mut time = Decimal::ZERO;
+        let mut open = Decimal::ZERO;
+        let mut high = Decimal::ZERO;
+        let mut low = Decimal::ZERO;
+        let mut close = Decimal::ZERO;
+        let mut volume = Decimal::ZERO;
+        let symbol = symbol.clone();
+
+        for trade in trades {
+            // time, open, high, low, close初始化
+            if time.eq(&Decimal::ZERO) {
+                time = Decimal::from_str(trade.inner()[1].as_str()).unwrap();
+                open = Decimal::from_str(trade.inner()[3].as_str()).unwrap();
+                high = Decimal::from_str(trade.inner()[3].as_str()).unwrap();
+                low = Decimal::from_str(trade.inner()[3].as_str()).unwrap();
+            }
+
+            // close
+            close = Decimal::from_str(trade.inner()[3].as_str()).unwrap();
+
+            // high
+            high = max(high, close);
+            low = min(low, close);
+            // volume
+            volume += Decimal::from_str(trade.inner()[2].as_str()).unwrap().abs();
+        }
+
+        return Record {
+            time,
+            open,
+            high,
+            low,
+            close,
+            volume,
+            symbol,
+        }
+    }
+
+    Record {
+        time: Default::default(),
+        open: Default::default(),
+        high: Default::default(),
+        low: Default::default(),
+        close: Default::default(),
+        volume: Default::default(),
+        symbol,
+    }
+}

+ 5 - 0
src/listener_tools.rs

@@ -1,4 +1,5 @@
 use std::str::FromStr;
+use rust_decimal::Decimal;
 use rust_decimal::prelude::ToPrimitive;
 use tokio::sync::MutexGuard;
 use standard::{Record, SpecialTrade, Trade};
@@ -35,6 +36,10 @@ pub async fn update_trade(new_trade: &Trade, mut trades_map: MutexGuard<'_, Trad
 
 // 更新k线
 pub async fn update_record(new_record: &Record, mut records_map: MutexGuard<'_, RecordMap>, exchange: &str) {
+    if new_record.time.eq(&Decimal::ZERO) {
+        return;
+    }
+
     // 如果k线记录存在于map,则进行一系列操作,用于保存map
     if let Some(record) = records_map.get_mut(new_record.symbol.as_str()) {
         let last_trade_minutes = record.time.to_i64().unwrap() / 60000;             // 将毫秒转换成分钟数

+ 2 - 0
src/main.rs

@@ -5,6 +5,7 @@ mod listener_tools;
 mod bitget_usdt_swap_data_listener;
 mod gate_usdt_swap_data_listener;
 mod binance_usdt_swap_data_listener;
+mod coinex_usdt_swap_data_listener;
 
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -32,6 +33,7 @@ async fn main() {
     gate_usdt_swap_data_listener::run_listener(running.clone()).await;
     bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     binance_usdt_swap_data_listener::run_listener(running.clone()).await;
+    coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
     // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {

+ 626 - 0
standard/src/coinex_swap.rs

@@ -0,0 +1,626 @@
+use std::collections::{BTreeMap};
+use std::io::{Error, ErrorKind};
+use std::str::FromStr;
+use tokio::sync::mpsc::Sender;
+use async_trait::async_trait;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
+use serde_json::{Value};
+use tracing::{error, info, trace};
+use exchanges::coinex_swap_rest::CoinexSwapRest;
+use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, PositionModeEnum};
+
+#[allow(dead_code)]
+#[derive(Clone)]
+pub struct CoinexSwap {
+    exchange: ExchangeEnum,
+    symbol: String,
+    is_colo: bool,
+    params: BTreeMap<String, String>,
+    request: CoinexSwapRest,
+    market: Market,
+    order_sender: Sender<Order>,
+    error_sender: Sender<Error>,
+}
+
+impl CoinexSwap {
+    pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> CoinexSwap {
+        let market = Market::new();
+        let mut coinex_swap = CoinexSwap {
+            exchange: ExchangeEnum::CoinexSwap,
+            symbol: symbol.to_uppercase(),
+            is_colo,
+            params: params.clone(),
+            request: CoinexSwapRest::new(params.clone()),
+            market,
+            order_sender,
+            error_sender,
+        };
+
+        // 修改持仓模式
+        let symbol_array: Vec<&str> = symbol.split("_").collect();
+        let mode_result = coinex_swap.set_dual_mode(symbol_array[1], true).await;
+        match mode_result {
+            Ok(_) => {
+                trace!("Coinex:设置持仓模式成功!")
+            }
+            Err(error) => {
+                error!("Coinex:设置持仓模式失败!mode_result={}", error)
+            }
+        }
+        // 获取市场信息
+        coinex_swap.market = CoinexSwap::get_market(&mut coinex_swap).await.unwrap_or(coinex_swap.market);
+        // 设置持仓杠杆
+        let lever_rate_result = coinex_swap.set_dual_leverage("10").await;
+        match lever_rate_result {
+            Ok(ok) => {
+                info!("Coinex:设置持仓杠杆成功!{:?}", ok);
+            }
+            Err(error) => {
+                error!("Coinex:设置持仓杠杆失败!{:?}", error)
+            }
+        }
+        return coinex_swap;
+    }
+}
+
+#[async_trait]
+impl Platform for CoinexSwap {
+    // 克隆方法
+    fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
+    // 获取交易所模式
+    fn get_self_exchange(&self) -> ExchangeEnum {
+        ExchangeEnum::CoinexSwap
+    }
+    // 获取交易对
+    fn get_self_symbol(&self) -> String { self.symbol.clone() }
+    // 获取是否使用高速通道
+    fn get_self_is_colo(&self) -> bool {
+        self.is_colo
+    }
+    // 获取params信息
+    fn get_self_params(&self) -> BTreeMap<String, String> {
+        self.params.clone()
+    }
+    // 获取market信息
+    fn get_self_market(&self) -> Market { self.market.clone() }
+    // 获取请求时间
+    fn get_request_delays(&self) -> Vec<i64> { self.request.get_delays() }
+    // 获取请求平均时间
+    fn get_request_avg_delay(&self) -> Decimal { self.request.get_avg_delay() }
+    // 获取请求最大时间
+    fn get_request_max_delay(&self) -> i64 { self.request.get_max_delay() }
+
+    // 获取服务器时间
+    async fn get_server_time(&mut self) -> Result<String, Error> {
+        let res_data = self.request.get_server_time().await;
+        if res_data.code == 200 {
+            let res_data_json: Value = res_data.data;
+            let result = res_data_json["timestamp"].to_string();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取账号信息
+    async fn get_account(&mut self) -> Result<Account, Error> {
+        let symbol_array: Vec<&str> = self.symbol.split("_").collect();
+        let coin = symbol_array[1].to_string().to_uppercase();
+        let res_data = self.request.get_account().await;
+        if res_data.code == 200 {
+            let res_data_array = res_data.data.as_array().unwrap();
+            for res_data_json in res_data_array.iter() {
+                if res_data_json["ccy"].as_str().unwrap() == coin {
+                    let frozen_balance= Decimal::from_str(res_data_json["frozen"].as_str().unwrap()).unwrap();
+                    let available_balance = Decimal::from_str(res_data_json["available"].as_str().unwrap()).unwrap();
+                    let balance = frozen_balance + available_balance;
+                    let result = Account {
+                        coin: symbol_array[1].to_string(),
+                        balance,
+                        available_balance,
+                        frozen_balance,
+                        stocks: Decimal::ZERO,
+                        available_stocks: Decimal::ZERO,
+                        frozen_stocks: Decimal::ZERO,
+                    };
+                    return Ok(result);
+                }
+            }
+        }
+        Err(Error::new(ErrorKind::Other, res_data.to_string()))
+    }
+
+    async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
+        Err(Error::new(ErrorKind::NotFound, "coinex_swap:该方法暂未实现".to_string()))
+    }
+
+    // 获取持仓信息
+    async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
+        let symbol: String = self.symbol.replace("_", "").to_uppercase();
+        let ct_val = self.market.ct_val;
+        let res_data = self.request.get_position(symbol).await;
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let result = res_data_json.iter().map(|item| { format_position_item(item, ct_val) }).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 获取所有持仓
+    async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
+        let res_data = self.request.get_user_position().await;
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let result = res_data_json.iter().map(|item| { format_position_item(item, Decimal::ONE) }).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+    // 获取市场行情
+    async fn get_ticker(&mut self) -> Result<Ticker, Error> {
+        let symbol: String = self.symbol.replace("_", "");
+        let res_data = self.request.get_ticker(symbol.clone()).await;
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let ticker_info = res_data_json.iter().find(|item| item["market"].as_str().unwrap() == symbol);
+            match ticker_info {
+                None => {
+                    error!("coinex_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let result = Ticker {
+                        time: chrono::Utc::now().timestamp_millis(),
+                        high: Decimal::from_str(value["high"].as_str().unwrap()).unwrap(),
+                        low: Decimal::from_str(value["low"].as_str().unwrap()).unwrap(),
+                        sell: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
+                        buy: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
+                        last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
+                        volume: Decimal::from_str(value["volume"].as_str().unwrap()).unwrap(),
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_ticker_symbol(&mut self, symbol_param: String) -> Result<Ticker, Error> {
+        let symbol: String = symbol_param.replace("_", "").to_uppercase();
+        let res_data = self.request.get_ticker(symbol.clone()).await;
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let ticker_info = res_data_json.iter().find(|item| item["contract"].as_str().unwrap() == symbol);
+            match ticker_info {
+                None => {
+                    error!("coinex_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let result = Ticker {
+                        time: chrono::Utc::now().timestamp_millis(),
+                        high: Decimal::from_str(value["high"].as_str().unwrap()).unwrap(),
+                        low: Decimal::from_str(value["low"].as_str().unwrap()).unwrap(),
+                        sell: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
+                        buy: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
+                        last: Decimal::from_str(value["last"].as_str().unwrap()).unwrap(),
+                        volume: Decimal::from_str(value["volume"].as_str().unwrap()).unwrap(),
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market(&mut self) -> Result<Market, Error> {
+        let symbol_array: Vec<&str> = self.symbol.split("_").collect();
+        let symbol = format!("{}{}", symbol_array[0], symbol_array[1]);
+        let res_data = self.request.get_market_details(symbol.clone()).await;
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let market_info = res_data_json.iter().find(|item| item["market"].as_str().unwrap() == symbol);
+            match market_info {
+                None => {
+                    error!("coinex_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    // 报价精度字符串
+                    let price_precision_i64 = value["quote_ccy_precision"].as_i64().unwrap();
+                    // 价格最小变动数值
+                    let tick_size = Decimal::new(1, 0) / Decimal::new(10i64.pow(price_precision_i64.to_u32().unwrap()), 0);
+                    // 报价精度
+                    let price_precision = Decimal::from_i64(price_precision_i64).unwrap();
+                    // 最小数量
+                    let min_qty = Decimal::from_str(value["min_amount"].as_str().unwrap()).unwrap();
+                    // 数量没有最大值
+                    let max_qty = Decimal::MAX;
+                    // 没有张数
+                    let ct_val = Decimal::ONE;
+
+                    let amount_size = min_qty * ct_val;
+                    let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
+                    let min_notional = min_qty * ct_val;
+                    let max_notional = max_qty * ct_val;
+
+                    let result = Market {
+                        symbol: self.symbol.clone(),
+                        base_asset: symbol_array[0].to_string(),
+                        quote_asset: symbol_array[1].to_string(),
+                        tick_size,
+                        amount_size,
+                        price_precision,
+                        amount_precision,
+                        min_qty,
+                        max_qty,
+                        min_notional,
+                        max_notional,
+                        ct_val,
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
+        let symbol_upper = symbol.to_uppercase();
+        let symbol_array: Vec<&str> = symbol_upper.split("_").collect();
+        let symbol = format!("{}{}", symbol_array[0], symbol_array[1]);
+        let res_data = self.request.get_market_details(symbol.clone()).await;
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let market_info = res_data_json.iter().find(|item| item["name"].as_str().unwrap() == symbol.clone());
+            match market_info {
+                None => {
+                    error!("coinex_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
+                    Err(Error::new(ErrorKind::Other, res_data.to_string()))
+                }
+                Some(value) => {
+                    let tick_size = Decimal::from_str(value["quote_ccy_precision"].as_str().unwrap()).unwrap();
+                    let min_qty = Decimal::from_str(&value["min_amount"].to_string()).unwrap();
+                    // 数量没有最大值
+                    let max_qty = Decimal::MAX;
+                    // 没有张数
+                    let ct_val = Decimal::ONE;
+
+                    let amount_size = min_qty * ct_val;
+                    let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
+                    let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
+                    let min_notional = min_qty * ct_val;
+                    let max_notional = max_qty * ct_val;
+
+                    let result = Market {
+                        symbol: self.symbol.clone(),
+                        base_asset: symbol_array[0].to_string(),
+                        quote_asset: symbol_array[1].to_string(),
+                        tick_size,
+                        amount_size,
+                        price_precision,
+                        amount_precision,
+                        min_qty,
+                        max_qty,
+                        min_notional,
+                        max_notional,
+                        ct_val,
+                    };
+                    Ok(result)
+                }
+            }
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 获取订单详情
+    async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol = self.symbol.replace("_", "").to_uppercase();
+        let ct_val = self.market.ct_val;
+        let res_data;
+        if order_id != "" {
+            res_data = self.request.get_order_details(order_id.to_string(), symbol).await;
+        } else if custom_id != "" {
+            // 通过客户端id查询  只有未完成的订单才能查询出来
+            res_data = self.request.get_pending_order(format!("t-{}", custom_id)).await;
+        } else {
+            return Err(Error::new(ErrorKind::Other, format!("订单id和客户端id都为空,查询失败!order_id :{}, custom_id: {}", order_id, custom_id)));
+        }
+
+        if res_data.code == 200 {
+            let res_data_json: Value = res_data.data;
+            let mut result = format_order_item(res_data_json, ct_val, "");
+            result.custom_id = custom_id.to_string();
+            result.id = order_id.to_string();
+            Ok(result)
+        } else if res_data.code == -1 && res_data.message.contains("3103:order not exists") { // 未成交已取消的订单会报不存在
+            let mut order = Order::new();
+            order.id = order_id.to_string();
+            order.custom_id = custom_id.to_string();
+            order.status = "REMOVE".to_string();
+            Ok(order)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 获取未完成订单列表
+    async fn get_orders_list(&mut self, status: &str) -> Result<Vec<Order>, Error> {
+        let symbol = self.symbol.replace("_", "").to_uppercase();
+        let ct_val = self.market.ct_val;
+        let status_order;
+        let res_data;
+        if status == "pending" {
+            res_data = self.request.get_pending_orders().await;
+            status_order = "open";
+        } else if status == "finish" {
+            res_data = self.request.get_finished_orders().await;
+            status_order = "filled";
+        }else{
+            return Err(Error::new(ErrorKind::Other, status));
+        }
+        if res_data.code == 200 {
+            let res_data_json = res_data.data.as_array().unwrap();
+            let order_info: Vec<_> = res_data_json.iter().filter(|item| item["market"].as_str().unwrap_or("") == symbol).collect();
+            let result = order_info.iter().map(|&item| format_order_item(item.clone(), ct_val, status_order)).collect();
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 下单接口
+    async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol = self.symbol.replace("_", "").to_uppercase();
+        let ct_val = self.market.ct_val;
+        let order_side;
+        let position_side;
+        let size = (amount / ct_val).floor();
+        match origin_side {
+            "kd" => {
+                position_side = "long";
+                order_side = "buy";
+            }
+            "pd" => {
+                position_side = "long";
+                order_side = "sell";
+            }
+            "kk" => {
+                position_side = "short";
+                order_side = "sell";
+            }
+            "pk" => {
+                position_side = "short";
+                order_side = "buy";
+            }
+            _ => {
+                error!("下单参数错误");
+                position_side = "error";
+                order_side = "error";
+            }
+        };
+        let res_data = self.request.order(symbol, position_side.to_string(), order_side.to_string(), size, price, custom_id.to_string()).await;
+        if res_data.code == 200 {
+            let res_data_json: Value = res_data.data;
+            // info!("take_order {}", res_data_json);
+            let result = format_order_item(res_data_json, ct_val, "open");
+            Ok(result)
+        } else {
+            // error!("take_order error {}", res_data.data);
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn take_order_symbol(&mut self, symbol_y: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
+        let symbol = symbol_y.replace("_", "").to_uppercase();
+        let order_side;
+        let position_side;
+        let size = (amount / ct_val).floor();
+        match origin_side {
+            "kd" => {
+                position_side = "long";
+                order_side = "buy";
+            }
+            "pd" => {
+                position_side = "long";
+                order_side = "sell";
+            }
+            "kk" => {
+                position_side = "short";
+                order_side = "sell";
+            }
+            "pk" => {
+                position_side = "short";
+                order_side = "buy";
+            }
+            _ => {
+                error!("下单参数错误");
+                position_side = "error";
+                order_side = "error";
+            }
+        };
+        let res_data = self.request.order(symbol, position_side.to_string(), order_side.to_string(), size, price, custom_id.to_string()).await;
+        if res_data.code == 200 {
+            let res_data_json: Value = res_data.data;
+            // info!("take_order_symbol {}", res_data_json);
+            let result = format_order_item(res_data_json, ct_val, "open");
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    // 撤销订单
+    async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
+        let symbol = self.symbol.replace("_", "").to_uppercase();
+
+        let ct_val = self.market.ct_val;
+        let res_data = self.request.cancel_order(symbol, order_id, custom_id).await;
+        if res_data.code == 200 {
+            let res_data_json: Value = res_data.data;
+            // info!("cancel_order {} order_id {} custom_id {}", res_data_json, order_id, custom_id);
+            let mut result = format_order_item(res_data_json, ct_val, "canceled");
+            result.custom_id = custom_id.to_string();
+            result.id = order_id.to_string();
+            Ok(result)
+        } else {
+            let message = format!("撤单HTTP请求失败  order_id: {}, custom_id: {}, res_data: {:?}", order_id, custom_id, res_data);
+            Err(Error::new(ErrorKind::Other, message))
+        }
+    }
+    // 批量撤销订单
+    async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
+        let symbol = self.symbol.replace("_", "").to_uppercase();
+        let res_data = self.request.cancel_order_all(symbol).await;
+        if res_data.code == 200 {
+            // let res_data_json = res_data.data.as_array().unwrap();
+            // info!("cancel_orders {:?}", res_data_json);
+            let result = vec![];
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
+        let ct_val = self.market.ct_val;
+        let orders_res_data = self.request.get_pending_orders().await;
+        let status = "canceled";
+        if orders_res_data.code == 200 {
+            let mut result = vec![];
+            let orders_res_data_json = orders_res_data.data.as_array().unwrap();
+            for order in orders_res_data_json {
+                let cancel_res_data = self.request.cancel_order_all( order["market"].as_str().unwrap().to_string()).await;
+                // info!("cancel_orders_all {:?}", cancel_res_data);
+                if cancel_res_data.code == 200 {
+                    result.push(format_order_item(order.clone(), ct_val, status))
+                } else {
+                    return Err(Error::new(ErrorKind::Other, cancel_res_data.to_string()));
+                }
+            }
+            Ok(result)
+        } else {
+            Err(Error::new(ErrorKind::Other, orders_res_data.to_string()))
+        }
+    }
+
+    async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "coin_ex:该交易所暂未实现自动订单下单".to_string()))
+    }
+
+    async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<Value, Error> {
+        Err(Error::new(ErrorKind::NotFound, "coin_ex:该交易所暂未实现取消自动订单".to_string()))
+    }
+
+    // 设置持仓模式
+    async fn set_dual_mode(&mut self, _coin: &str, _is_dual_mode: bool) -> Result<String, Error> {
+        Err(Error::new(ErrorKind::NotFound, "coin_ex:该交易所只允许单向持仓,无法设置持仓模式".to_string()))
+    }
+
+    // 更新杠杆
+    async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
+        let leverage_int = leverage.parse::<i32>().unwrap();
+        let symbol = self.symbol.replace("_", "").to_uppercase();
+        let res_data = self.request.setting_dual_leverage(symbol, leverage_int).await;
+        if res_data.code == 200 {
+            let res_data_str = &res_data.data;
+            let result = res_data_str.clone();
+            Ok(result.to_string())
+        } else {
+            Err(Error::new(ErrorKind::Other, res_data.to_string()))
+        }
+    }
+
+    async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "Coinex:该交易所方法未实现".to_string())) }
+
+    async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
+        Err(Error::new(ErrorKind::NotFound, "Coinex wallet_transfers:该交易所方法未实现".to_string()))
+    }
+}
+
+pub fn format_position_item(position: &Value, ct_val: Decimal) -> Position {
+    let position_mode = match position["side"].as_str().unwrap_or("") {
+        "long" => PositionModeEnum::Long,
+        "short" => PositionModeEnum::Short,
+        _ => {
+            error!("coinex_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position);
+            panic!("coinex_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
+        }
+    };
+    let size = Decimal::from_str(&position["open_interest"].as_str().unwrap()).unwrap();
+    let amount = size * ct_val;
+    Position {
+        symbol: position["market"].as_str().unwrap_or("").parse().unwrap(),
+        margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
+        amount,
+        frozen_amount: Decimal::ZERO,
+        price: Decimal::from_str(position["avg_entry_price"].as_str().unwrap()).unwrap(),
+        profit: Decimal::from_str(position["unrealized_pnl"].as_str().unwrap()).unwrap(),
+        position_mode,
+        margin: Decimal::from_str(position["ath_margin_size"].as_str().unwrap()).unwrap(),
+    }
+}
+
+pub fn format_order_item(order: Value, ct_val: Decimal, status :&str) -> Order {
+    if order.is_null() || (order.is_array() && order.as_array().unwrap().is_empty()){
+        return Order {
+            id: "".to_string(),
+            custom_id: "".to_string(),
+            price: Decimal::ZERO,
+            amount: Decimal::ZERO,
+            deal_amount: Decimal::ZERO,
+            avg_price: Decimal::ZERO,
+            status: "REMOVE".to_string(),
+            order_type: "limit".to_string(),
+        }
+    }
+    let size;
+    match order["amount"].as_str() {
+        Some(val) => {
+            size = Decimal::from_str(val).unwrap()
+        },
+        None => {
+            error!("coinex_swap:格式化订单大小错误!\nformat_order_item:order={:?} status={}", order, status);
+            panic!("coinex_swap:格式化订单大小失败,退出程序!");
+        }
+    }
+    // info!("order {}", order);
+    // 通过客户端id查询订单 只能查出未完成订单(没有状态字段)
+    let status = order["status"].as_str().unwrap_or(status);
+    let text = order["client_id"].as_str().unwrap_or("");
+
+    let deal_amount = Decimal::from_str(&order["filled_amount"].as_str().unwrap()).unwrap();
+    let filled_value = Decimal::from_str(&order["filled_value"].as_str().unwrap()).unwrap();
+
+    let amount = size * ct_val;
+    let mut avg_price = Decimal::ZERO;
+    if deal_amount != Decimal::ZERO{
+        avg_price = filled_value/deal_amount;
+    }
+    let custom_status = if status == "filled" || status == "canceled" {
+        "REMOVE".to_string()
+    } else if status == "open" || status == "part_filled" || status == "part_canceled" {
+        "NEW".to_string()
+    } else {
+        error!("coinex_swap:格式化订单状态错误!\nformat_order_item:order={:?}", order);
+        "NULL".to_string()
+    };
+    let rst_order = Order {
+        id: order["order_id"].to_string(),
+        custom_id: text.replace("t-my-custom-id_", "").replace("t-", ""),
+        price: Decimal::from_str(order["price"].as_str().unwrap()).unwrap(),
+        amount,
+        deal_amount,
+        avg_price,
+        status: custom_status,
+        order_type: "limit".to_string(),
+    };
+    return rst_order;
+}

+ 194 - 0
standard/src/coinex_swap_handle.rs

@@ -0,0 +1,194 @@
+use std::str::FromStr;
+use rust_decimal::Decimal;
+use rust_decimal::prelude::FromPrimitive;
+use exchanges::response_base::ResponseData;
+use crate::{Trade};
+
+// 处理账号信息
+// pub fn handle_account_info(res_data: &ResponseData, symbol: &String) -> Account {
+//     let res_data_json = res_data.data["balance_list"].as_array().unwrap();
+//     format_account_info(res_data_json, symbol)
+// }
+
+// pub fn format_account_info(data: &Vec<Value>, symbol: &String) -> Account {
+//     let symbol_upper = symbol.to_uppercase();
+//     let symbol_array: Vec<&str> = symbol_upper.split("_").collect();
+//     let balance_info = data.iter().find(|&item| item["ccy"].as_str().unwrap().contains(symbol_array[1]));
+//
+//     match balance_info {
+//         None => {
+//             error!("Coinex:格式化账号信息错误!\nformat_account_info: data={:?}", data);
+//             panic!("Coinex:格式化账号信息错误!\nformat_account_info: data={:?}", data)
+//         }
+//         Some(value) => {
+//             let frozen_balance= Decimal::from_str(&value["frozen"].as_str().unwrap()).unwrap();
+//             let available_balance = Decimal::from_str(&value["available"].as_str().unwrap()).unwrap();
+//             let margin = Decimal::from_str(&value["margin"].as_str().unwrap()).unwrap();
+//             let profit_unreal = Decimal::from_str(&value["unrealized_pnl"].as_str().unwrap()).unwrap();
+//             let balance = frozen_balance + available_balance + margin + profit_unreal;
+//             Account {
+//                 coin: symbol_array[1].to_string(),
+//                 balance,
+//                 available_balance: Decimal::ZERO,
+//                 frozen_balance: Decimal::ZERO,
+//                 stocks: Decimal::ZERO,
+//                 available_stocks: Decimal::ZERO,
+//                 frozen_stocks: Decimal::ZERO,
+//             }
+//         }
+//     }
+// }
+
+// 处理position信息
+// pub fn handle_position(res_data: &ResponseData, ct_val: &Decimal) -> Vec<Position> {
+//     let res_data_json = &res_data.data["position"];
+//     let position =  format_position_item(res_data_json, ct_val);
+//     vec![position]
+// }
+
+// pub fn format_position_item(position: &Value, ct_val: &Decimal) -> Position {
+//     let position_mode = match position["side"].as_str().unwrap_or("") {
+//         "long" => PositionModeEnum::Long,
+//         "short" => PositionModeEnum::Short,
+//         _ => {
+//             error!("Coinex:格式化持仓模式错误!\nformat_position_item:position={:?}", position);
+//             panic!("Coinex:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
+//         }
+//     };
+//     let size = Decimal::from_str(&position["open_interest"].as_str().unwrap()).unwrap();
+//     let amount = size * ct_val;
+//     Position {
+//         symbol: position["market"].as_str().unwrap().to_string(),
+//         margin_level: Decimal::from_str(&position["leverage"].as_str().unwrap()).unwrap(),
+//         amount,
+//         frozen_amount: Decimal::ZERO,
+//         price: Decimal::from_str(&position["avg_entry_price"].as_str().unwrap()).unwrap(),
+//         profit: Decimal::from_str(&position["unrealized_pnl"].as_str().unwrap()).unwrap(),
+//         position_mode,
+//         margin: Decimal::from_str(&position["ath_margin_size"].as_str().unwrap()).unwrap(),
+//     }
+// }
+
+// 处理order信息
+// pub fn handle_order(res_data: ResponseData, ct_val: Decimal) -> SpecialOrder {
+//     let status = res_data.data["event"].as_str().unwrap();
+//     let res_data_json = &res_data.data["order"];
+//     let order_info = format_order_item(res_data_json, ct_val, status);
+//
+//     SpecialOrder {
+//         name: res_data.tag,
+//         order: vec![order_info],
+//     }
+// }
+
+// pub fn format_order_item(order: &Value, ct_val: Decimal, status: &str) -> Order {
+//     let text = order["client_id"].as_str().unwrap_or("");
+//     let size = Decimal::from_str(order["amount"].as_str().unwrap()).unwrap();
+//     let left = Decimal::from_str(order["unfilled_amount"].as_str().unwrap()).unwrap();
+//     // 已成交量
+//     let filled_amount = size - left;
+//     // 成交额
+//     let filled_value = Decimal::from_str(order["filled_value"].as_str().unwrap()).unwrap();
+//     // 成交均价
+//     let mut avg_price = Decimal::ZERO;
+//     if filled_amount > Decimal::ZERO{
+//         avg_price = filled_value/filled_amount;
+//     }
+//     let amount = size * ct_val;
+//     let deal_amount = filled_amount * ct_val;
+//     let custom_status = if status == "finish" { "REMOVE".to_string() } else if status == "put" || status == "update" { "NEW".to_string() } else {
+//         "NULL".to_string()
+//     };
+//     let rst_order = Order {
+//         id: order["order_id"].to_string(),
+//         custom_id: text.replace("t-my-custom-id_", "").replace("t-", ""),
+//         price: Decimal::from_str(order["price"].as_str().unwrap()).unwrap(),
+//         amount,
+//         deal_amount,
+//         avg_price,
+//         status: custom_status,
+//         order_type: "limit".to_string(),
+//     };
+//     return rst_order;
+// }
+
+pub fn format_trade_items(response: &ResponseData) -> Vec<Trade> {
+    let symbol = response.data["market"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+    let result = response.data["deal_list"].as_array().unwrap();
+    let mut trades = vec![];
+
+    for item in result {
+        let id = format!("{}", item["deal_id"].as_i64().unwrap());
+        let time = Decimal::from_i64(item["created_at"].as_i64().unwrap()).unwrap();
+        let mut size = Decimal::from_str(item["amount"].as_str().unwrap()).unwrap();
+        let side = item["side"].as_str().unwrap().to_string();
+        size = match side.as_str() {
+            "buy" => {
+                size
+            }
+            "sell" => {
+                -size
+            }
+            _ => {
+                let msg = format!("未知的方向:{}", item.to_string());
+                panic!("{}", msg)
+            }
+        };
+        let price = Decimal::from_str(item["price"].as_str().unwrap().to_string().as_str()).unwrap();
+
+        let mut trade = Trade {
+            id,
+            time,
+            size,
+            price,
+            symbol: symbol.clone(),
+        };
+
+        if item["side"].as_str().unwrap().eq("sell") {
+            trade.size = trade.size * Decimal::NEGATIVE_ONE;
+        }
+
+        trades.push(trade)
+    }
+
+    return trades
+}
+
+// 处理特殊Ticket信息
+// pub fn handle_ticker(res_data: &ResponseData) -> SpecialDepth {
+//     let depth = &res_data.data["depth"];
+//
+//     let bp = Decimal::from_str(depth["bids"][0][0].as_str().unwrap()).unwrap();
+//     let bq = Decimal::from_str(depth["bids"][0][1].as_str().unwrap()).unwrap();
+//     let ap = Decimal::from_str(depth["asks"][0][0].as_str().unwrap()).unwrap();
+//     let aq = Decimal::from_str(depth["asks"][0][1].as_str().unwrap()).unwrap();
+//     let mp = (bp + ap) * dec!(0.5);
+//     let t = Decimal::from_i64(depth.get("checksum").unwrap().as_i64().unwrap_or(0i64)).unwrap();
+//     let create_at = depth.get("updated_at").unwrap().as_i64().unwrap() * 1000;
+//
+//     let ticker_info = SpecialTicker { sell: ap, buy: bp, mid_price: mp, t, create_at };
+//     let depth_info = vec![bp, bq, ap, aq];
+//
+//     SpecialDepth {
+//         name: (*res_data).tag.clone(),
+//         depth: depth_info,
+//         ticker: ticker_info,
+//         t,
+//         create_at,
+//     }
+// }
+
+// pub fn format_depth_items(value: &Value) -> Vec<MarketOrder> {
+//     if value.is_null() {
+//         return vec![];
+//     }
+//     let mut depth_items: Vec<MarketOrder> = vec![];
+//     for value in value.as_array().unwrap() {
+//         let values = value.as_array().unwrap();
+//         depth_items.push(MarketOrder {
+//             price: Decimal::from_str(values[0].as_str().unwrap()).unwrap(),
+//             amount: Decimal::from_str(values[1].as_str().unwrap()).unwrap(),
+//         })
+//     }
+//     return depth_items;
+// }

+ 5 - 0
standard/src/exchange.rs

@@ -7,6 +7,7 @@ use crate::gate_swap::GateSwap;
 use crate::kucoin_swap::KucoinSwap;
 use crate::bybit_swap::BybitSwap;
 use crate::bitget_swap::BitgetSwap;
+use crate::coinex_swap::CoinexSwap;
 
 /// 交易所交易模式枚举
 /// - `BinanceSwap`: Binance交易所期货;
@@ -17,6 +18,7 @@ use crate::bitget_swap::BitgetSwap;
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum ExchangeEnum {
     BinanceSwap,
+    CoinexSwap,
     // BinanceSpot,
     GateSwap,
     // GateSpot,
@@ -97,6 +99,9 @@ impl Exchange {
             ExchangeEnum::BybitSwap => {
                 Box::new(BybitSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
             }
+            ExchangeEnum::CoinexSwap => {
+                Box::new(CoinexSwap::new(symbol, is_colo, params, order_sender, error_sender).await)
+            }
         }
     }
 }

+ 20 - 2
standard/src/exchange_struct_handler.rs

@@ -1,11 +1,11 @@
-use crate::{Record, Ticker, Trade};
 use std::str::FromStr;
 use rust_decimal::{Decimal};
 use rust_decimal::prelude::FromPrimitive;
 use tracing::{error};
 use exchanges::response_base::ResponseData;
 use crate::exchange::ExchangeEnum;
-use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, kucoin_handle, Depth};
+use crate::{binance_swap_handle, gate_swap_handle, bybit_swap_handle, bitget_swap_handle, coinex_swap_handle, kucoin_handle};
+use crate::{Record, Ticker, Trade, Depth};
 use crate::{Account, OrderBook, Position, SpecialOrder};
 
 #[allow(dead_code)]
@@ -43,6 +43,9 @@ impl ExchangeStructHandler {
                 depth_bids = kucoin_handle::format_depth_items(res_data.data["bids"].clone());
                 t = Decimal::from_str(&res_data.data["ts"].to_string()).unwrap();
             }
+            ExchangeEnum::CoinexSwap => {
+                panic!("还不支持CoinexSwap的order_book_handle")
+            }
             // ExchangeEnum::KucoinSpot => {
             //     depth_asks = kucoin_spot_handle::format_depth_items(res_data_json["asks"].clone());
             //     depth_bids = kucoin_spot_handle::format_depth_items(res_data_json["bids"].clone());
@@ -92,6 +95,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::BinanceSwap => {
                 binance_swap_handle::format_trade_items(&res_data)
             },
+            ExchangeEnum::CoinexSwap => {
+                coinex_swap_handle::format_trade_items(&res_data)
+            },
             _ => {
                 error!("未找到该交易所!trades_handle: {:?}", exchange);
                 panic!("未找到该交易所!trades_handle: {:?}", exchange);
@@ -108,6 +114,9 @@ impl ExchangeStructHandler {
                 // binance_swap_handle::handle_book_ticker(res_data)
                 panic!("BinanceSwap 未实现格式化");
             }
+            ExchangeEnum::CoinexSwap => {
+                panic!("还不支持CoinexSwap的book_ticker_handle")
+            }
             ExchangeEnum::GateSwap => {
                 // gate_swap_handle::handle_book_ticker(res_data);
                 panic!("GateSwap 未实现格式化");
@@ -150,6 +159,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::BybitSwap => {
                 panic!("BybitSwap records_handle 未实现格式化");
             }
+            ExchangeEnum::CoinexSwap => {
+                panic!("还不支持CoinexSwap的records_handle")
+            }
             // ExchangeEnum::KucoinSpot => {
             //     kucoin_spot_handle::handle_special_ticker(res_data)
             // }
@@ -220,6 +232,9 @@ impl ExchangeStructHandler {
             ExchangeEnum::KucoinSwap => {
                 kucoin_handle::handle_position(res_data, ct_val)
             }
+            ExchangeEnum::CoinexSwap => {
+                panic!("还不支持CoinexSwap的position_handle")
+            }
             // ExchangeEnum::KucoinSpot => {
             //     error!("暂未提供此交易所方法!handle_position:{:?}", exchange);
             //     panic!("暂未提供此交易所方法!handle_position:{:?}", exchange);
@@ -246,6 +261,9 @@ impl ExchangeStructHandler {
                 error!("暂未提供此交易所方法!handle_order:{:?}", exchange);
                 panic!("暂未提供此交易所方法!handle_order:{:?}", exchange);
             }
+            ExchangeEnum::CoinexSwap => {
+                panic!("还不支持CoinexSwap的order_handle")
+            }
             ExchangeEnum::GateSwap => {
                 gate_swap_handle::handle_order(res_data, ct_val)
             }

+ 2 - 0
standard/src/lib.rs

@@ -33,6 +33,8 @@ mod bybit_swap;
 mod bybit_swap_handle;
 mod bitget_swap;
 mod bitget_swap_handle;
+mod coinex_swap;
+mod coinex_swap_handle;
 
 /// 持仓模式枚举
 /// - `Both`:单持仓方向