|
|
@@ -1,393 +1,393 @@
|
|
|
-use std::collections::BTreeMap;
|
|
|
-use std::sync::Arc;
|
|
|
-use std::sync::atomic::AtomicBool;
|
|
|
-
|
|
|
-use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
|
|
|
-use tokio::sync::Mutex;
|
|
|
-use tokio_tungstenite::tungstenite::{Error, Message};
|
|
|
-use tracing::{error, info, trace};
|
|
|
-
|
|
|
-use crate::kucoin_swap_rest::KucoinSwapRest;
|
|
|
-use crate::response_base::ResponseData;
|
|
|
-use crate::socket_tool::{AbstractWsMode, HeartbeatType};
|
|
|
-
|
|
|
-//类型
|
|
|
-pub enum KucoinSwapWsType {
|
|
|
- Public,
|
|
|
- Private,
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-#[derive(Debug)]
|
|
|
-#[derive(Clone)]
|
|
|
-pub struct KucoinSwapWsParam {
|
|
|
- pub token: String,
|
|
|
- pub ws_url: String,
|
|
|
- pub ws_ping_interval: i64,
|
|
|
- pub ws_ping_timeout: i64,
|
|
|
- pub is_ok_subscribe: bool,
|
|
|
-}
|
|
|
-
|
|
|
-//订阅频道
|
|
|
-#[derive(Clone)]
|
|
|
-pub enum KucoinSwapSubscribeType {
|
|
|
- PuContractMarketLevel2Depth50,
|
|
|
- //买卖盘 快照,asks:卖,bids:买入
|
|
|
- PuContractMarketExecution,
|
|
|
- PuContractMarkettickerV2,
|
|
|
-
|
|
|
- PrContractAccountWallet,
|
|
|
- PrContractPosition,
|
|
|
- PrContractMarketTradeOrdersSys,
|
|
|
- PrContractMarketTradeOrders,
|
|
|
-}
|
|
|
-
|
|
|
-//账号信息
|
|
|
-#[derive(Clone, Debug)]
|
|
|
-pub struct KucoinSwapLogin {
|
|
|
- pub access_key: String,
|
|
|
- pub secret_key: String,
|
|
|
- pub pass_key: String,
|
|
|
-}
|
|
|
-
|
|
|
-#[derive(Clone)]
|
|
|
-#[allow(dead_code)]
|
|
|
-pub struct KucoinSwapWs {
|
|
|
- //类型
|
|
|
- label: String,
|
|
|
- //地址
|
|
|
- address_url: String,
|
|
|
- //账号
|
|
|
- login_param: Option<KucoinSwapLogin>,
|
|
|
- //登陆数据
|
|
|
- ws_param: KucoinSwapWsParam,
|
|
|
- //币对
|
|
|
- symbol_s: Vec<String>,
|
|
|
- //订阅
|
|
|
- subscribe_types: Vec<KucoinSwapSubscribeType>,
|
|
|
- //心跳间隔
|
|
|
- heartbeat_time: u64,
|
|
|
-}
|
|
|
-
|
|
|
-impl KucoinSwapWs {
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************获取一个对象****************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- pub async fn new(is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
|
|
|
- return Self::new_label("default-KucoinSwapWs".to_string(), is_colo, login_param, ws_type).await;
|
|
|
- }
|
|
|
- pub async fn new_label(label: String, is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
|
|
|
- /*******公共频道-私有频道数据组装*/
|
|
|
- let mut ws_param = KucoinSwapWsParam {
|
|
|
- token: "".to_string(),
|
|
|
- ws_url: "".to_string(),
|
|
|
- ws_ping_interval: 0,
|
|
|
- ws_ping_timeout: 0,
|
|
|
- is_ok_subscribe: false,
|
|
|
- };
|
|
|
-
|
|
|
- /*******公共频道-私有频道数据组装*/
|
|
|
- let res_data = Self::get_rul_token(ws_type, login_param.clone()).await;
|
|
|
- let address_url = match res_data {
|
|
|
- Ok(param) => {
|
|
|
- ws_param = param;
|
|
|
- format!("{}?token={}", ws_param.ws_url, ws_param.token)
|
|
|
- }
|
|
|
- Err(error) => {
|
|
|
- error!("-链接地址等参数错误:{:?}", error);
|
|
|
- "".to_string()
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- if is_colo {
|
|
|
- info!("开启高速(未配置,走普通:{})通道",address_url);
|
|
|
- } else {
|
|
|
- info!("走普通通道:{}",address_url);
|
|
|
- }
|
|
|
-
|
|
|
- KucoinSwapWs {
|
|
|
- label,
|
|
|
- address_url,
|
|
|
- login_param,
|
|
|
- ws_param,
|
|
|
- symbol_s: vec![],
|
|
|
- subscribe_types: vec![],
|
|
|
- heartbeat_time: 1000 * 18,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************订阅函数********************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- //根据当前类型获取对应的频道 地址 与 token
|
|
|
- async fn get_rul_token(ws_type: KucoinSwapWsType, login_param: Option<KucoinSwapLogin>) -> Result<KucoinSwapWsParam, reqwest::Error> {
|
|
|
- let mut kucoin_exc = KucoinSwapRest::new(false, match login_param {
|
|
|
- None => {
|
|
|
- let btree_map: BTreeMap<String, String> = BTreeMap::new();
|
|
|
- btree_map
|
|
|
- }
|
|
|
- Some(d) => {
|
|
|
- let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
|
|
|
- btree_map.insert("access_key".to_string(), d.access_key);
|
|
|
- btree_map.insert("secret_key".to_string(), d.secret_key);
|
|
|
- btree_map.insert("pass_key".to_string(), d.pass_key);
|
|
|
- btree_map
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
-
|
|
|
- let res_data = match ws_type {
|
|
|
- KucoinSwapWsType::Public => {
|
|
|
- kucoin_exc.get_public_token().await
|
|
|
- }
|
|
|
- KucoinSwapWsType::Private => {
|
|
|
- kucoin_exc.get_private_token().await
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- trace!("kucoin-swap-rest 获取ws连接地址:{:?}",res_data);
|
|
|
-
|
|
|
- if res_data.code == "200" {
|
|
|
- let mut ws_url = "".to_string();
|
|
|
- let mut ws_token = "".to_string();
|
|
|
- let mut ws_ping_interval: i64 = 0;
|
|
|
- let mut ws_ping_timeout: i64 = 0;
|
|
|
-
|
|
|
-
|
|
|
- //数据解析
|
|
|
- let parsed_json: serde_json::Value = serde_json::from_str(res_data.data.as_str()).unwrap();
|
|
|
- if let Some(value) = parsed_json.get("token") {
|
|
|
- let formatted_value = match value {
|
|
|
- serde_json::Value::String(s) => s.clone(),
|
|
|
- _ => value.to_string()
|
|
|
- };
|
|
|
- ws_token = format!("{}", formatted_value);
|
|
|
- }
|
|
|
- if let Some(endpoint) = parsed_json["instanceServers"][0]["endpoint"].as_str() {
|
|
|
- ws_url = format!("{}", endpoint);
|
|
|
- }
|
|
|
- if let Some(ping_interval) = parsed_json["instanceServers"][0]["pingInterval"].as_i64() {
|
|
|
- ws_ping_interval = ping_interval;
|
|
|
- }
|
|
|
- if let Some(ping_timeout) = parsed_json["instanceServers"][0]["pingTimeout"].as_i64() {
|
|
|
- ws_ping_timeout = ping_timeout;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- Ok(KucoinSwapWsParam { ws_url, token: ws_token, ws_ping_interval, ws_ping_timeout, is_ok_subscribe: false })
|
|
|
- } else {
|
|
|
- error!("公共/私有-频道获取失败:{:?}", res_data);
|
|
|
- panic!("公共/私有-频道获取失败:{:?}", res_data);
|
|
|
- }
|
|
|
- }
|
|
|
- //手动添加订阅信息
|
|
|
- pub fn set_subscribe(&mut self, subscribe_types: Vec<KucoinSwapSubscribeType>) {
|
|
|
- self.subscribe_types.extend(subscribe_types);
|
|
|
- }
|
|
|
- //手动添加币对
|
|
|
- pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
|
|
|
- for symbol in b_array.iter_mut() {
|
|
|
- // 大写
|
|
|
- *symbol = symbol.to_uppercase();
|
|
|
- // 字符串替换
|
|
|
- *symbol = symbol.replace("_", "");
|
|
|
- *symbol = symbol.replace("-", "");
|
|
|
- }
|
|
|
- self.symbol_s = b_array;
|
|
|
- }
|
|
|
- fn contains_pr(&self) -> bool {
|
|
|
- for t in self.subscribe_types.clone() {
|
|
|
- if match t {
|
|
|
- KucoinSwapSubscribeType::PuContractMarketLevel2Depth50 => false,
|
|
|
- KucoinSwapSubscribeType::PuContractMarketExecution => false,
|
|
|
- KucoinSwapSubscribeType::PuContractMarkettickerV2 => false,
|
|
|
-
|
|
|
- KucoinSwapSubscribeType::PrContractAccountWallet => true,
|
|
|
- KucoinSwapSubscribeType::PrContractPosition => true,
|
|
|
- KucoinSwapSubscribeType::PrContractMarketTradeOrdersSys => true,
|
|
|
- KucoinSwapSubscribeType::PrContractMarketTradeOrders => true,
|
|
|
- } {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- false
|
|
|
- }
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************工具函数********************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- //订阅枚举解析
|
|
|
- pub fn enum_to_string(symbol: String, subscribe_type: KucoinSwapSubscribeType) -> serde_json::Value {
|
|
|
- match subscribe_type {
|
|
|
- KucoinSwapSubscribeType::PuContractMarketLevel2Depth50 => {//level2
|
|
|
- serde_json::json!({
|
|
|
- "topic": format!("/contractMarket/level2Depth50:{}", symbol),
|
|
|
- "type": "subscribe",
|
|
|
- "response": true
|
|
|
- })
|
|
|
- }
|
|
|
- KucoinSwapSubscribeType::PuContractMarketExecution => {//match
|
|
|
- serde_json::json!({
|
|
|
- "topic": format!("/contractMarket/execution:{}", symbol),
|
|
|
- "type": "subscribe",
|
|
|
- "response": true
|
|
|
- })
|
|
|
- }
|
|
|
- KucoinSwapSubscribeType::PuContractMarkettickerV2 => {//tickerV2
|
|
|
- serde_json::json!({
|
|
|
- "topic": format!("/contractMarket/tickerV2:{}", symbol),
|
|
|
- "type": "subscribe",
|
|
|
- "response": true
|
|
|
- })
|
|
|
- }
|
|
|
- KucoinSwapSubscribeType::PrContractAccountWallet => {//orderMargin.change
|
|
|
- serde_json::json!({
|
|
|
- "type": "subscribe",
|
|
|
- "topic": "/contractAccount/wallet",
|
|
|
- "privateChannel":true,
|
|
|
- "response":true,
|
|
|
- })
|
|
|
- }
|
|
|
- KucoinSwapSubscribeType::PrContractPosition => {//position.change
|
|
|
- serde_json::json!({
|
|
|
- "type": "subscribe",
|
|
|
- "topic": format!("/contract/position:{}", symbol),
|
|
|
- "privateChannel":true,
|
|
|
- "response":true,
|
|
|
- })
|
|
|
- }
|
|
|
- KucoinSwapSubscribeType::PrContractMarketTradeOrdersSys => {//orderChange
|
|
|
- serde_json::json!({
|
|
|
- "type": "subscribe",
|
|
|
- "topic": format!("/contractMarket/tradeOrders"),
|
|
|
- "privateChannel":true,
|
|
|
- "response":true,
|
|
|
- })
|
|
|
- }
|
|
|
- KucoinSwapSubscribeType::PrContractMarketTradeOrders => {//symbolOrderChange
|
|
|
- serde_json::json!({
|
|
|
- "type": "subscribe",
|
|
|
- "topic": format!("/contractMarket/tradeOrders:{}", symbol),
|
|
|
- "privateChannel":true,
|
|
|
- "response":true,
|
|
|
- })
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- //订阅信息生成
|
|
|
- pub fn get_subscription(&self) -> Vec<String> {
|
|
|
- let mut array = vec![];
|
|
|
- for symbol in &self.symbol_s {
|
|
|
- for subscribe_type in &self.subscribe_types {
|
|
|
- let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
|
|
|
- array.push(ty_str.to_string());
|
|
|
- }
|
|
|
- }
|
|
|
- array
|
|
|
- }
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************socket基本*****************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- //链接
|
|
|
- pub async fn ws_connect_async(&mut self,
|
|
|
- bool_v1: Arc<AtomicBool>,
|
|
|
- write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
|
|
|
- write_rx: UnboundedReceiver<Message>,
|
|
|
- read_tx: UnboundedSender<ResponseData>,
|
|
|
- ) -> Result<(), Error>
|
|
|
- {
|
|
|
- let login_is = self.contains_pr();
|
|
|
- let subscription = self.get_subscription();
|
|
|
- let address_url = self.address_url.clone();
|
|
|
- let label = self.label.clone();
|
|
|
- let heartbeat_time = self.ws_param.ws_ping_interval.clone();
|
|
|
-
|
|
|
- //心跳-- 方法内部线程启动
|
|
|
- let write_tx_clone1 = write_tx_am.clone();
|
|
|
- tokio::spawn(async move {
|
|
|
- trace!("线程-异步心跳-开始");
|
|
|
- AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
|
|
|
- trace!("线程-异步心跳-结束");
|
|
|
- });
|
|
|
-
|
|
|
-
|
|
|
- //设置订阅
|
|
|
- let subscribe_array = subscription.clone();
|
|
|
- if login_is {
|
|
|
- //登录相关
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- //1 链接
|
|
|
-
|
|
|
- let t2 = tokio::spawn(async move {
|
|
|
- trace!("线程-异步链接-开始");
|
|
|
- match AbstractWsMode::ws_connect_async(bool_v1, address_url.clone(),
|
|
|
- label.clone(), subscribe_array,
|
|
|
- write_rx, read_tx,
|
|
|
- Self::message_text,
|
|
|
- Self::message_ping,
|
|
|
- Self::message_pong,
|
|
|
- ).await {
|
|
|
- Ok(_) => { trace!("线程-异步链接-结束"); }
|
|
|
- Err(e) => { trace!("发生异常:kucoin-期货链接关闭-{:?}",e); }
|
|
|
- }
|
|
|
- });
|
|
|
- tokio::try_join!(t2).unwrap();
|
|
|
- trace!("线程-心跳与链接-结束");
|
|
|
-
|
|
|
-
|
|
|
- Ok(())
|
|
|
- }
|
|
|
- /*******************************************************************************************************/
|
|
|
- /*****************************************数据解析*****************************************************/
|
|
|
- /*******************************************************************************************************/
|
|
|
- //数据解析-Text
|
|
|
- pub fn message_text(text: String) -> Option<ResponseData> {
|
|
|
- let response_data = Self::ok_text(text);
|
|
|
- Option::from(response_data)
|
|
|
- }
|
|
|
- //数据解析-ping
|
|
|
- pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
|
|
|
- return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
|
|
|
- }
|
|
|
- //数据解析-pong
|
|
|
- pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
|
|
|
- return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
|
|
|
- }
|
|
|
- //数据解析
|
|
|
- pub fn ok_text(text: String) -> ResponseData
|
|
|
- {
|
|
|
- // trace!("原始数据:{:?}",text);
|
|
|
- let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
|
|
|
- let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
|
|
|
-
|
|
|
- //订阅 相应
|
|
|
- if json_value["type"].as_str() == Option::from("welcome") {
|
|
|
- //链接成功
|
|
|
- res_data.code = "-200".to_string();
|
|
|
- res_data.message = "链接成功,主动发起订阅".to_string();
|
|
|
- } else if json_value["type"].as_str() == Option::from("ack") {
|
|
|
- res_data.code = "-201".to_string();
|
|
|
- res_data.message = "订阅成功".to_string();
|
|
|
- } else if json_value["type"].as_str() == Option::from("error") {
|
|
|
- res_data.code = format!("{}", json_value["code"]);
|
|
|
- res_data.message = format!("{}", json_value["data"].as_str().unwrap());
|
|
|
- } else if json_value.get("topic").is_some() {
|
|
|
- res_data.channel = format!("{}", json_value["subject"].as_str().unwrap());
|
|
|
-
|
|
|
- if json_value["topic"].as_str() == Option::from("/contractAccount/wallet") {
|
|
|
- res_data.code = "".to_string();
|
|
|
- if json_value["subject"].as_str() == Option::from("availableBalance.change") {
|
|
|
- res_data.code = "200".to_string();
|
|
|
- res_data.data = json_value["data"].to_string();
|
|
|
- } else {}
|
|
|
- } else {
|
|
|
- res_data.data = json_value["data"].to_string();
|
|
|
- }
|
|
|
- } else {
|
|
|
- res_data.code = "".to_string();
|
|
|
- res_data.message = "未知解析".to_string();
|
|
|
- }
|
|
|
- res_data
|
|
|
- }
|
|
|
-}
|
|
|
+// use std::collections::BTreeMap;
|
|
|
+// use std::sync::Arc;
|
|
|
+// use std::sync::atomic::AtomicBool;
|
|
|
+//
|
|
|
+// use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
|
|
|
+// use tokio::sync::Mutex;
|
|
|
+// use tokio_tungstenite::tungstenite::{Error, Message};
|
|
|
+// use tracing::{error, info, trace};
|
|
|
+//
|
|
|
+// use crate::kucoin_swap_rest::KucoinSwapRest;
|
|
|
+// use crate::response_base::ResponseData;
|
|
|
+// use crate::socket_tool::{AbstractWsMode, HeartbeatType};
|
|
|
+//
|
|
|
+// //类型
|
|
|
+// pub enum KucoinSwapWsType {
|
|
|
+// Public,
|
|
|
+// Private,
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// #[derive(Debug)]
|
|
|
+// #[derive(Clone)]
|
|
|
+// pub struct KucoinSwapWsParam {
|
|
|
+// pub token: String,
|
|
|
+// pub ws_url: String,
|
|
|
+// pub ws_ping_interval: i64,
|
|
|
+// pub ws_ping_timeout: i64,
|
|
|
+// pub is_ok_subscribe: bool,
|
|
|
+// }
|
|
|
+//
|
|
|
+// //订阅频道
|
|
|
+// #[derive(Clone)]
|
|
|
+// pub enum KucoinSwapSubscribeType {
|
|
|
+// PuContractMarketLevel2Depth50,
|
|
|
+// //买卖盘 快照,asks:卖,bids:买入
|
|
|
+// PuContractMarketExecution,
|
|
|
+// PuContractMarkettickerV2,
|
|
|
+//
|
|
|
+// PrContractAccountWallet,
|
|
|
+// PrContractPosition,
|
|
|
+// PrContractMarketTradeOrdersSys,
|
|
|
+// PrContractMarketTradeOrders,
|
|
|
+// }
|
|
|
+//
|
|
|
+// //账号信息
|
|
|
+// #[derive(Clone, Debug)]
|
|
|
+// pub struct KucoinSwapLogin {
|
|
|
+// pub access_key: String,
|
|
|
+// pub secret_key: String,
|
|
|
+// pub pass_key: String,
|
|
|
+// }
|
|
|
+//
|
|
|
+// #[derive(Clone)]
|
|
|
+// #[allow(dead_code)]
|
|
|
+// pub struct KucoinSwapWs {
|
|
|
+// //类型
|
|
|
+// label: String,
|
|
|
+// //地址
|
|
|
+// address_url: String,
|
|
|
+// //账号
|
|
|
+// login_param: Option<KucoinSwapLogin>,
|
|
|
+// //登陆数据
|
|
|
+// ws_param: KucoinSwapWsParam,
|
|
|
+// //币对
|
|
|
+// symbol_s: Vec<String>,
|
|
|
+// //订阅
|
|
|
+// subscribe_types: Vec<KucoinSwapSubscribeType>,
|
|
|
+// //心跳间隔
|
|
|
+// heartbeat_time: u64,
|
|
|
+// }
|
|
|
+//
|
|
|
+// impl KucoinSwapWs {
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// /*****************************************获取一个对象****************************************************/
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// pub async fn new(is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
|
|
|
+// return Self::new_label("default-KucoinSwapWs".to_string(), is_colo, login_param, ws_type).await;
|
|
|
+// }
|
|
|
+// pub async fn new_label(label: String, is_colo: bool, login_param: Option<KucoinSwapLogin>, ws_type: KucoinSwapWsType) -> KucoinSwapWs {
|
|
|
+// /*******公共频道-私有频道数据组装*/
|
|
|
+// let mut ws_param = KucoinSwapWsParam {
|
|
|
+// token: "".to_string(),
|
|
|
+// ws_url: "".to_string(),
|
|
|
+// ws_ping_interval: 0,
|
|
|
+// ws_ping_timeout: 0,
|
|
|
+// is_ok_subscribe: false,
|
|
|
+// };
|
|
|
+//
|
|
|
+// /*******公共频道-私有频道数据组装*/
|
|
|
+// let res_data = Self::get_rul_token(ws_type, login_param.clone()).await;
|
|
|
+// let address_url = match res_data {
|
|
|
+// Ok(param) => {
|
|
|
+// ws_param = param;
|
|
|
+// format!("{}?token={}", ws_param.ws_url, ws_param.token)
|
|
|
+// }
|
|
|
+// Err(error) => {
|
|
|
+// error!("-链接地址等参数错误:{:?}", error);
|
|
|
+// "".to_string()
|
|
|
+// }
|
|
|
+// };
|
|
|
+//
|
|
|
+// if is_colo {
|
|
|
+// info!("开启高速(未配置,走普通:{})通道",address_url);
|
|
|
+// } else {
|
|
|
+// info!("走普通通道:{}",address_url);
|
|
|
+// }
|
|
|
+//
|
|
|
+// KucoinSwapWs {
|
|
|
+// label,
|
|
|
+// address_url,
|
|
|
+// login_param,
|
|
|
+// ws_param,
|
|
|
+// symbol_s: vec![],
|
|
|
+// subscribe_types: vec![],
|
|
|
+// heartbeat_time: 1000 * 18,
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// /*****************************************订阅函数********************************************************/
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// //根据当前类型获取对应的频道 地址 与 token
|
|
|
+// async fn get_rul_token(ws_type: KucoinSwapWsType, login_param: Option<KucoinSwapLogin>) -> Result<KucoinSwapWsParam, reqwest::Error> {
|
|
|
+// let mut kucoin_exc = KucoinSwapRest::new(false, match login_param {
|
|
|
+// None => {
|
|
|
+// let btree_map: BTreeMap<String, String> = BTreeMap::new();
|
|
|
+// btree_map
|
|
|
+// }
|
|
|
+// Some(d) => {
|
|
|
+// let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
|
|
|
+// btree_map.insert("access_key".to_string(), d.access_key);
|
|
|
+// btree_map.insert("secret_key".to_string(), d.secret_key);
|
|
|
+// btree_map.insert("pass_key".to_string(), d.pass_key);
|
|
|
+// btree_map
|
|
|
+// }
|
|
|
+// });
|
|
|
+//
|
|
|
+//
|
|
|
+// let res_data = match ws_type {
|
|
|
+// KucoinSwapWsType::Public => {
|
|
|
+// kucoin_exc.get_public_token().await
|
|
|
+// }
|
|
|
+// KucoinSwapWsType::Private => {
|
|
|
+// kucoin_exc.get_private_token().await
|
|
|
+// }
|
|
|
+// };
|
|
|
+//
|
|
|
+// trace!("kucoin-swap-rest 获取ws连接地址:{:?}",res_data);
|
|
|
+//
|
|
|
+// if res_data.code == "200" {
|
|
|
+// let mut ws_url = "".to_string();
|
|
|
+// let mut ws_token = "".to_string();
|
|
|
+// let mut ws_ping_interval: i64 = 0;
|
|
|
+// let mut ws_ping_timeout: i64 = 0;
|
|
|
+//
|
|
|
+//
|
|
|
+// //数据解析
|
|
|
+// let parsed_json: serde_json::Value = serde_json::from_str(res_data.data.as_str()).unwrap();
|
|
|
+// if let Some(value) = parsed_json.get("token") {
|
|
|
+// let formatted_value = match value {
|
|
|
+// serde_json::Value::String(s) => s.clone(),
|
|
|
+// _ => value.to_string()
|
|
|
+// };
|
|
|
+// ws_token = format!("{}", formatted_value);
|
|
|
+// }
|
|
|
+// if let Some(endpoint) = parsed_json["instanceServers"][0]["endpoint"].as_str() {
|
|
|
+// ws_url = format!("{}", endpoint);
|
|
|
+// }
|
|
|
+// if let Some(ping_interval) = parsed_json["instanceServers"][0]["pingInterval"].as_i64() {
|
|
|
+// ws_ping_interval = ping_interval;
|
|
|
+// }
|
|
|
+// if let Some(ping_timeout) = parsed_json["instanceServers"][0]["pingTimeout"].as_i64() {
|
|
|
+// ws_ping_timeout = ping_timeout;
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// Ok(KucoinSwapWsParam { ws_url, token: ws_token, ws_ping_interval, ws_ping_timeout, is_ok_subscribe: false })
|
|
|
+// } else {
|
|
|
+// error!("公共/私有-频道获取失败:{:?}", res_data);
|
|
|
+// panic!("公共/私有-频道获取失败:{:?}", res_data);
|
|
|
+// }
|
|
|
+// }
|
|
|
+// //手动添加订阅信息
|
|
|
+// pub fn set_subscribe(&mut self, subscribe_types: Vec<KucoinSwapSubscribeType>) {
|
|
|
+// self.subscribe_types.extend(subscribe_types);
|
|
|
+// }
|
|
|
+// //手动添加币对
|
|
|
+// pub fn set_symbols(&mut self, mut b_array: Vec<String>) {
|
|
|
+// for symbol in b_array.iter_mut() {
|
|
|
+// // 大写
|
|
|
+// *symbol = symbol.to_uppercase();
|
|
|
+// // 字符串替换
|
|
|
+// *symbol = symbol.replace("_", "");
|
|
|
+// *symbol = symbol.replace("-", "");
|
|
|
+// }
|
|
|
+// self.symbol_s = b_array;
|
|
|
+// }
|
|
|
+// fn contains_pr(&self) -> bool {
|
|
|
+// for t in self.subscribe_types.clone() {
|
|
|
+// if match t {
|
|
|
+// KucoinSwapSubscribeType::PuContractMarketLevel2Depth50 => false,
|
|
|
+// KucoinSwapSubscribeType::PuContractMarketExecution => false,
|
|
|
+// KucoinSwapSubscribeType::PuContractMarkettickerV2 => false,
|
|
|
+//
|
|
|
+// KucoinSwapSubscribeType::PrContractAccountWallet => true,
|
|
|
+// KucoinSwapSubscribeType::PrContractPosition => true,
|
|
|
+// KucoinSwapSubscribeType::PrContractMarketTradeOrdersSys => true,
|
|
|
+// KucoinSwapSubscribeType::PrContractMarketTradeOrders => true,
|
|
|
+// } {
|
|
|
+// return true;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// false
|
|
|
+// }
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// /*****************************************工具函数********************************************************/
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// //订阅枚举解析
|
|
|
+// pub fn enum_to_string(symbol: String, subscribe_type: KucoinSwapSubscribeType) -> serde_json::Value {
|
|
|
+// match subscribe_type {
|
|
|
+// KucoinSwapSubscribeType::PuContractMarketLevel2Depth50 => {//level2
|
|
|
+// serde_json::json!({
|
|
|
+// "topic": format!("/contractMarket/level2Depth50:{}", symbol),
|
|
|
+// "type": "subscribe",
|
|
|
+// "response": true
|
|
|
+// })
|
|
|
+// }
|
|
|
+// KucoinSwapSubscribeType::PuContractMarketExecution => {//match
|
|
|
+// serde_json::json!({
|
|
|
+// "topic": format!("/contractMarket/execution:{}", symbol),
|
|
|
+// "type": "subscribe",
|
|
|
+// "response": true
|
|
|
+// })
|
|
|
+// }
|
|
|
+// KucoinSwapSubscribeType::PuContractMarkettickerV2 => {//tickerV2
|
|
|
+// serde_json::json!({
|
|
|
+// "topic": format!("/contractMarket/tickerV2:{}", symbol),
|
|
|
+// "type": "subscribe",
|
|
|
+// "response": true
|
|
|
+// })
|
|
|
+// }
|
|
|
+// KucoinSwapSubscribeType::PrContractAccountWallet => {//orderMargin.change
|
|
|
+// serde_json::json!({
|
|
|
+// "type": "subscribe",
|
|
|
+// "topic": "/contractAccount/wallet",
|
|
|
+// "privateChannel":true,
|
|
|
+// "response":true,
|
|
|
+// })
|
|
|
+// }
|
|
|
+// KucoinSwapSubscribeType::PrContractPosition => {//position.change
|
|
|
+// serde_json::json!({
|
|
|
+// "type": "subscribe",
|
|
|
+// "topic": format!("/contract/position:{}", symbol),
|
|
|
+// "privateChannel":true,
|
|
|
+// "response":true,
|
|
|
+// })
|
|
|
+// }
|
|
|
+// KucoinSwapSubscribeType::PrContractMarketTradeOrdersSys => {//orderChange
|
|
|
+// serde_json::json!({
|
|
|
+// "type": "subscribe",
|
|
|
+// "topic": format!("/contractMarket/tradeOrders"),
|
|
|
+// "privateChannel":true,
|
|
|
+// "response":true,
|
|
|
+// })
|
|
|
+// }
|
|
|
+// KucoinSwapSubscribeType::PrContractMarketTradeOrders => {//symbolOrderChange
|
|
|
+// serde_json::json!({
|
|
|
+// "type": "subscribe",
|
|
|
+// "topic": format!("/contractMarket/tradeOrders:{}", symbol),
|
|
|
+// "privateChannel":true,
|
|
|
+// "response":true,
|
|
|
+// })
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// //订阅信息生成
|
|
|
+// pub fn get_subscription(&self) -> Vec<String> {
|
|
|
+// let mut array = vec![];
|
|
|
+// for symbol in &self.symbol_s {
|
|
|
+// for subscribe_type in &self.subscribe_types {
|
|
|
+// let ty_str = Self::enum_to_string(symbol.clone(), subscribe_type.clone());
|
|
|
+// array.push(ty_str.to_string());
|
|
|
+// }
|
|
|
+// }
|
|
|
+// array
|
|
|
+// }
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// /*****************************************socket基本*****************************************************/
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// //链接
|
|
|
+// pub async fn ws_connect_async(&mut self,
|
|
|
+// is_shutdown_arc: Arc<AtomicBool>,
|
|
|
+// write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
|
|
|
+// write_rx: UnboundedReceiver<Message>,
|
|
|
+// read_tx: UnboundedSender<ResponseData>,
|
|
|
+// ) -> Result<(), Error>
|
|
|
+// {
|
|
|
+// let login_is = self.contains_pr();
|
|
|
+// let subscription = self.get_subscription();
|
|
|
+// let address_url = self.address_url.clone();
|
|
|
+// let label = self.label.clone();
|
|
|
+// let heartbeat_time = self.ws_param.ws_ping_interval.clone();
|
|
|
+//
|
|
|
+// //心跳-- 方法内部线程启动
|
|
|
+// let write_tx_clone1 = write_tx_am.clone();
|
|
|
+// tokio::spawn(async move {
|
|
|
+// trace!("线程-异步心跳-开始");
|
|
|
+// AbstractWsMode::ping_or_pong(write_tx_clone1, HeartbeatType::Ping, heartbeat_time as u64).await;
|
|
|
+// trace!("线程-异步心跳-结束");
|
|
|
+// });
|
|
|
+//
|
|
|
+//
|
|
|
+// //设置订阅
|
|
|
+// let subscribe_array = subscription.clone();
|
|
|
+// if login_is {
|
|
|
+// //登录相关
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+// //1 链接
|
|
|
+//
|
|
|
+// let t2 = tokio::spawn(async move {
|
|
|
+// trace!("线程-异步链接-开始");
|
|
|
+// match AbstractWsMode::ws_connect_async(is_shutdown_arc, address_url.clone(),
|
|
|
+// label.clone(), subscribe_array,
|
|
|
+// write_rx, read_tx,
|
|
|
+// Self::message_text,
|
|
|
+// Self::message_ping,
|
|
|
+// Self::message_pong,
|
|
|
+// ).await {
|
|
|
+// Ok(_) => { trace!("线程-异步链接-结束"); }
|
|
|
+// Err(e) => { trace!("发生异常:kucoin-期货链接关闭-{:?}",e); }
|
|
|
+// }
|
|
|
+// });
|
|
|
+// tokio::try_join!(t2).unwrap();
|
|
|
+// trace!("线程-心跳与链接-结束");
|
|
|
+//
|
|
|
+//
|
|
|
+// Ok(())
|
|
|
+// }
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// /*****************************************数据解析*****************************************************/
|
|
|
+// /*******************************************************************************************************/
|
|
|
+// //数据解析-Text
|
|
|
+// pub fn message_text(text: String) -> Option<ResponseData> {
|
|
|
+// let response_data = Self::ok_text(text);
|
|
|
+// Option::from(response_data)
|
|
|
+// }
|
|
|
+// //数据解析-ping
|
|
|
+// pub fn message_ping(_pi: Vec<u8>) -> Option<ResponseData> {
|
|
|
+// return Option::from(ResponseData::new("".to_string(), "-300".to_string(), "success".to_string(), "".to_string()));
|
|
|
+// }
|
|
|
+// //数据解析-pong
|
|
|
+// pub fn message_pong(_po: Vec<u8>) -> Option<ResponseData> {
|
|
|
+// return Option::from(ResponseData::new("".to_string(), "-301".to_string(), "success".to_string(), "".to_string()));
|
|
|
+// }
|
|
|
+// //数据解析
|
|
|
+// pub fn ok_text(text: String) -> ResponseData
|
|
|
+// {
|
|
|
+// // trace!("原始数据:{:?}",text);
|
|
|
+// let mut res_data = ResponseData::new("".to_string(), "200".to_string(), "success".to_string(), "".to_string());
|
|
|
+// let json_value: serde_json::Value = serde_json::from_str(&text).unwrap();
|
|
|
+//
|
|
|
+// //订阅 相应
|
|
|
+// if json_value["type"].as_str() == Option::from("welcome") {
|
|
|
+// //链接成功
|
|
|
+// res_data.code = "-200".to_string();
|
|
|
+// res_data.message = "链接成功,主动发起订阅".to_string();
|
|
|
+// } else if json_value["type"].as_str() == Option::from("ack") {
|
|
|
+// res_data.code = "-201".to_string();
|
|
|
+// res_data.message = "订阅成功".to_string();
|
|
|
+// } else if json_value["type"].as_str() == Option::from("error") {
|
|
|
+// res_data.code = format!("{}", json_value["code"]);
|
|
|
+// res_data.message = format!("{}", json_value["data"].as_str().unwrap());
|
|
|
+// } else if json_value.get("topic").is_some() {
|
|
|
+// res_data.channel = format!("{}", json_value["subject"].as_str().unwrap());
|
|
|
+//
|
|
|
+// if json_value["topic"].as_str() == Option::from("/contractAccount/wallet") {
|
|
|
+// res_data.code = "".to_string();
|
|
|
+// if json_value["subject"].as_str() == Option::from("availableBalance.change") {
|
|
|
+// res_data.code = "200".to_string();
|
|
|
+// res_data.data = json_value["data"].to_string();
|
|
|
+// } else {}
|
|
|
+// } else {
|
|
|
+// res_data.data = json_value["data"].to_string();
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// res_data.code = "".to_string();
|
|
|
+// res_data.message = "未知解析".to_string();
|
|
|
+// }
|
|
|
+// res_data
|
|
|
+// }
|
|
|
+// }
|