|
|
@@ -16,7 +16,8 @@ use crate::socket_tool::AbstractWsMode;
|
|
|
|
|
|
//类型
|
|
|
pub enum BinanceSwapWsType {
|
|
|
- PublicAndPrivate,
|
|
|
+ Public,
|
|
|
+ Private,
|
|
|
}
|
|
|
|
|
|
//订阅频道
|
|
|
@@ -27,7 +28,8 @@ pub enum BinanceSwapSubscribeType {
|
|
|
PuDepth20levels100ms,
|
|
|
|
|
|
PrAccount,
|
|
|
- PrBalance
|
|
|
+ PrBalance,
|
|
|
+ PrPosition
|
|
|
}
|
|
|
|
|
|
//账号信息
|
|
|
@@ -38,6 +40,14 @@ pub struct BinanceSwapLogin {
|
|
|
pub api_secret: String,
|
|
|
}
|
|
|
|
|
|
+//login
|
|
|
+#[derive(Clone)]
|
|
|
+#[allow(dead_code)]
|
|
|
+pub struct BinanceSwapLoginLinLink {
|
|
|
+ listenKey: String,
|
|
|
+ out_time: i64,
|
|
|
+}
|
|
|
+
|
|
|
#[derive(Clone)]
|
|
|
#[allow(dead_code)]
|
|
|
pub struct BinanceSwapWs {
|
|
|
@@ -47,28 +57,62 @@ pub struct BinanceSwapWs {
|
|
|
address_url: String,
|
|
|
//账号
|
|
|
login_param: Option<BinanceSwapLogin>,
|
|
|
+ //link _param
|
|
|
+ link_param: BinanceSwapLoginLinLink,
|
|
|
//币对
|
|
|
symbol_s: Vec<String>,
|
|
|
//订阅
|
|
|
subscribe_types: Vec<BinanceSwapSubscribeType>,
|
|
|
//心跳间隔
|
|
|
heartbeat_time: u64,
|
|
|
+
|
|
|
}
|
|
|
|
|
|
impl BinanceSwapWs {
|
|
|
/*******************************************************************************************************/
|
|
|
/*****************************************获取一个对象****************************************************/
|
|
|
/*******************************************************************************************************/
|
|
|
- pub fn new(is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
|
|
|
- return BinanceSwapWs::new_label("default-BinanceSwapWs".to_string(), is_colo, login_param, ws_type);
|
|
|
+ pub async fn new(is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
|
|
|
+ return BinanceSwapWs::new_label("default-BinanceSwapWs".to_string(), is_colo, login_param, ws_type).await;
|
|
|
}
|
|
|
- pub fn new_label(label: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
|
|
|
+ pub async fn new_label(label: String, is_colo: bool, login_param: Option<BinanceSwapLogin>, ws_type: BinanceSwapWsType) -> BinanceSwapWs {
|
|
|
+ let mut link_param = BinanceSwapLoginLinLink {
|
|
|
+ listenKey: "".to_string(),
|
|
|
+ out_time: 0,
|
|
|
+ };
|
|
|
/*******公共频道-私有频道数据组装*/
|
|
|
let address_url = match ws_type {
|
|
|
- BinanceSwapWsType::PublicAndPrivate => {
|
|
|
+ BinanceSwapWsType::Public => {
|
|
|
// "wss://fstream.binance.com/stream?streams=btcusdt@depth20@100ms".to_string(),
|
|
|
"wss://fstream.binance.com/stream".to_string()
|
|
|
}
|
|
|
+ BinanceSwapWsType::Private => {
|
|
|
+ "wss://fstream.binance.com/ws/".to_string()
|
|
|
+ // match login_param.clone() {
|
|
|
+ // None => {
|
|
|
+ // "wss://fstream.binance.com/stream".to_string()
|
|
|
+ // }
|
|
|
+ // Some(dto) => {
|
|
|
+ // let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
|
|
|
+ // btree_map.insert("access_key".to_string(), dto.api_key.clone());
|
|
|
+ // btree_map.insert("secret_key".to_string(), dto.api_secret.clone());
|
|
|
+ //
|
|
|
+ // let mut ba_exc = BinanceSwapRest::new(false, btree_map);
|
|
|
+ // let rep_data = ba_exc.close_listen_key(json!({ })).await;
|
|
|
+ // if (rep_data.code == 200) {
|
|
|
+ // let listenKey = rep_data.data["listenKey"].as_str().unwrap();
|
|
|
+ // trace!("拿到-data:{}",rep_data.data.clone());
|
|
|
+ // trace!("拿到-listenKey:{}",listenKey.clone());
|
|
|
+ // link_param.listenKey = listenKey.to_string();
|
|
|
+ // link_param.out_time = chrono::Utc::now().timestamp_millis() + (1000 * 60 * 55);
|
|
|
+ // format!("wss://fstream.binance.com/ws/{}", listenKey)
|
|
|
+ // } else {
|
|
|
+ // error!("binance_usdt_swap socket get listenKey, is error");
|
|
|
+ // "".to_string()
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
if is_colo {
|
|
|
@@ -81,6 +125,7 @@ impl BinanceSwapWs {
|
|
|
label,
|
|
|
address_url,
|
|
|
login_param,
|
|
|
+ link_param,
|
|
|
symbol_s: vec![],
|
|
|
subscribe_types: vec![],
|
|
|
heartbeat_time: 1000 * 20,
|
|
|
@@ -113,8 +158,9 @@ impl BinanceSwapWs {
|
|
|
BinanceSwapSubscribeType::PuAggTrade => false,
|
|
|
BinanceSwapSubscribeType::PuDepth20levels100ms => false,
|
|
|
|
|
|
- BinanceSwapSubscribeType::PrAccount => {true}
|
|
|
- BinanceSwapSubscribeType::PrBalance => {true}
|
|
|
+ BinanceSwapSubscribeType::PrAccount => { true }
|
|
|
+ BinanceSwapSubscribeType::PrBalance => { true }
|
|
|
+ BinanceSwapSubscribeType::PrPosition => {true}
|
|
|
} {
|
|
|
return true;
|
|
|
}
|
|
|
@@ -143,9 +189,12 @@ impl BinanceSwapWs {
|
|
|
BinanceSwapSubscribeType::PrBalance => {
|
|
|
"".to_string()
|
|
|
}
|
|
|
+ BinanceSwapSubscribeType::PrPosition => {
|
|
|
+ "".to_string()
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- pub fn enum_to_string_pr(listenKey: String, subscribe_type: BinanceSwapSubscribeType) -> String {
|
|
|
+ pub fn enum_to_string_pr(subscribe_type: BinanceSwapSubscribeType) -> String {
|
|
|
match subscribe_type {
|
|
|
BinanceSwapSubscribeType::PuAggTrade => {
|
|
|
"".to_string()
|
|
|
@@ -157,10 +206,13 @@ impl BinanceSwapWs {
|
|
|
"".to_string()
|
|
|
}
|
|
|
BinanceSwapSubscribeType::PrAccount => {
|
|
|
- format!("{}@account", listenKey)
|
|
|
+ "@account".to_string()
|
|
|
}
|
|
|
BinanceSwapSubscribeType::PrBalance => {
|
|
|
- format!("{}@balance", listenKey)
|
|
|
+ "@balance".to_string()
|
|
|
+ }
|
|
|
+ BinanceSwapSubscribeType::PrPosition => {
|
|
|
+ "@position".to_string()
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -170,31 +222,44 @@ impl BinanceSwapWs {
|
|
|
for symbol in &self.symbol_s {
|
|
|
for subscribe_type in &self.subscribe_types {
|
|
|
let ty_str = Self::enum_to_string_pu(symbol.clone(), subscribe_type.clone());
|
|
|
- params.push(ty_str);
|
|
|
+ if (ty_str != "".to_string()) {
|
|
|
+ params.push(ty_str);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- let str = json!({
|
|
|
+ if (params.len() > 0) {
|
|
|
+ let str = json!({
|
|
|
"method": "SUBSCRIBE",
|
|
|
"params": params,
|
|
|
"id": 1
|
|
|
});
|
|
|
- str.to_string()
|
|
|
+ str.to_string()
|
|
|
+ } else {
|
|
|
+ "".to_string()
|
|
|
+ }
|
|
|
}
|
|
|
- pub fn get_subscription_pr(&self,listenKey:String) -> String {
|
|
|
+ pub fn get_subscription_pr(&self) -> Vec<String> {
|
|
|
let mut params = vec![];
|
|
|
- for symbol in &self.symbol_s {
|
|
|
- for subscribe_type in &self.subscribe_types {
|
|
|
- let ty_str = Self::enum_to_string_pr(listenKey.clone(), subscribe_type.clone());
|
|
|
+ for subscribe_type in &self.subscribe_types {
|
|
|
+ let ty_str = Self::enum_to_string_pr(subscribe_type.clone());
|
|
|
+ if (ty_str != "".to_string()) {
|
|
|
params.push(ty_str);
|
|
|
}
|
|
|
}
|
|
|
- let str = json!({
|
|
|
- "method": "REQUEST",
|
|
|
- "params":params,
|
|
|
- "id": 12 // request ID.
|
|
|
- });
|
|
|
- str.to_string()
|
|
|
+
|
|
|
+ params
|
|
|
+
|
|
|
+ // if (params.len() > 0) {
|
|
|
+ // let str = json!({
|
|
|
+ // "method": "REQUEST",
|
|
|
+ // "params":params,
|
|
|
+ // "id": 12 // request ID.
|
|
|
+ // });
|
|
|
+ // str.to_string()
|
|
|
+ // } else {
|
|
|
+ // "".to_string()
|
|
|
+ // }
|
|
|
}
|
|
|
/*******************************************************************************************************/
|
|
|
/*****************************************socket基本*****************************************************/
|
|
|
@@ -205,12 +270,14 @@ impl BinanceSwapWs {
|
|
|
handle_function: F,
|
|
|
_write_tx_am: &Arc<Mutex<UnboundedSender<Message>>>,
|
|
|
write_to_socket_rx: UnboundedReceiver<Message>) -> Result<(), Error>
|
|
|
- where
|
|
|
- F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
|
|
|
- Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
|
|
|
+ where
|
|
|
+ F: Fn(ResponseData) -> Future + Clone + Send + 'static + Sync,
|
|
|
+ Future: std::future::Future<Output=()> + Send + 'static, // 确保 Fut 是一个 Future,且输出类型为 ()
|
|
|
{
|
|
|
let login_is = self.contains_pr();
|
|
|
+ let link_param = self.link_param.clone();
|
|
|
let subscription_pu = self.get_subscription_pu();
|
|
|
+ let subscription_pr = self.get_subscription_pr();
|
|
|
let address_url = self.address_url.clone();
|
|
|
let label = self.label.clone();
|
|
|
let heartbeat_time = self.heartbeat_time.clone();
|
|
|
@@ -226,30 +293,11 @@ impl BinanceSwapWs {
|
|
|
// });
|
|
|
|
|
|
//设置订阅
|
|
|
- let mut subscribe_array = vec![];
|
|
|
- if login_is {
|
|
|
- //登录相关
|
|
|
- match login_param_dto {
|
|
|
- None => {
|
|
|
- error!("binance_usdt_swap socket 账号信息……");
|
|
|
- }
|
|
|
- Some(dto) => {
|
|
|
- let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
|
|
|
- btree_map.insert("access_key".to_string(), dto.api_key.clone());
|
|
|
- btree_map.insert("secret_key".to_string(), dto.api_secret.clone());
|
|
|
-
|
|
|
- let mut ba_exc = BinanceSwapRest::new(false, btree_map);
|
|
|
- let rep_data = ba_exc.close_listen_key(json!({ })).await;
|
|
|
- if (rep_data.code == 200) {
|
|
|
- let listenKey = rep_data.data["listenKey"].as_str().unwrap();
|
|
|
- trace!("拿到-listenKey:{}",listenKey.clone());
|
|
|
- let subscription_pr = self.get_subscription_pr(listenKey.to_string());
|
|
|
- subscribe_array.push(subscription_pu.to_string());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- subscribe_array.push(subscription_pu.to_string());
|
|
|
+ // let mut subscribe_array = vec![];
|
|
|
+ //
|
|
|
+ // if (subscription_pu.len() > 0) {
|
|
|
+ // subscribe_array.push(subscription_pu.to_string());
|
|
|
+ // }
|
|
|
|
|
|
//链接
|
|
|
let t2 = tokio::spawn(async move {
|
|
|
@@ -257,8 +305,65 @@ impl BinanceSwapWs {
|
|
|
|
|
|
loop {
|
|
|
info!("binance_usdt_swap socket 连接中……");
|
|
|
+ // get listen_key
|
|
|
+ let listen_key = if login_is {
|
|
|
+ match login_param_dto.clone() {
|
|
|
+ None => {
|
|
|
+ "".to_string()
|
|
|
+ }
|
|
|
+ Some(dto) => {
|
|
|
+ let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
|
|
|
+ btree_map.insert("access_key".to_string(), dto.api_key.clone());
|
|
|
+ btree_map.insert("secret_key".to_string(), dto.api_secret.clone());
|
|
|
+
|
|
|
+ let mut ba_exc = BinanceSwapRest::new(false, btree_map);
|
|
|
+ let rep_data = ba_exc.get_listen_key(json!({ })).await;
|
|
|
+ trace!("拿到-rep_data:{:?}",rep_data) ;
|
|
|
+ if (rep_data.code == 200) {
|
|
|
+ let listenKey = rep_data.data["listenKey"].as_str().unwrap();
|
|
|
+ trace!("拿到-data:{}",rep_data.data.clone());
|
|
|
+ trace!("拿到-listenKey:{}",listenKey.clone());
|
|
|
+ // format!("{}{}", address_url.clone(), listenKey
|
|
|
+ listenKey.to_string()
|
|
|
+ } else {
|
|
|
+ "".to_string()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ "".to_string()
|
|
|
+ };
|
|
|
+
|
|
|
+ let this_address_url = if login_is {
|
|
|
+ if listen_key.len() == 0 {
|
|
|
+ error!("binance_usdt_swap socket get listenKey, is error");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ format!("{}{}", address_url.clone(), listen_key)
|
|
|
+ } else {
|
|
|
+ address_url.clone()
|
|
|
+ };
|
|
|
+
|
|
|
+ let mut subscribe_array = vec![];
|
|
|
+ if login_is {
|
|
|
+ let this_subscription_pr = subscription_pr.clone();
|
|
|
+ let mut params_pr: Vec<String> = vec![];
|
|
|
+ for dto in this_subscription_pr {
|
|
|
+ params_pr.push(format!("{}{}", listen_key, dto))
|
|
|
+ }
|
|
|
+ let json_dto = json!({
|
|
|
+ "method": "REQUEST",
|
|
|
+ "params":params_pr,
|
|
|
+ "id": 12 // request ID.
|
|
|
+ });
|
|
|
+ subscribe_array.push(json_dto.to_string());
|
|
|
+ } else {
|
|
|
+ subscribe_array.push(subscription_pu.to_string());
|
|
|
+ }
|
|
|
+
|
|
|
// ws层重连
|
|
|
- AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), address_url.clone(),
|
|
|
+ AbstractWsMode::ws_connect_async(is_shutdown_arc.clone(), handle_function.clone(), this_address_url.clone(),
|
|
|
false, label.clone(), subscribe_array.clone(), write_to_socket_rx_arc.clone(),
|
|
|
Self::message_text, Self::message_ping, Self::message_pong, Self::message_binary).await;
|
|
|
|
|
|
@@ -296,8 +401,8 @@ impl BinanceSwapWs {
|
|
|
}
|
|
|
//数据解析
|
|
|
pub fn ok_text(text: String) -> ResponseData {
|
|
|
- // trace!("原始数据");
|
|
|
- // trace!(?text);
|
|
|
+ trace!("原始数据");
|
|
|
+ trace!(?text);
|
|
|
let mut res_data = ResponseData::new("".to_string(), 200, "success".to_string(), Value::Null);
|
|
|
let json_value: Value = serde_json::from_str(&text).unwrap();
|
|
|
|
|
|
@@ -306,10 +411,25 @@ impl BinanceSwapWs {
|
|
|
{
|
|
|
res_data.code = -201;
|
|
|
res_data.message = "订阅成功".to_string();
|
|
|
- } else if json_value.get("error").is_some() {//订阅返回
|
|
|
+ } else if json_value.get("result").is_some() && json_value.get("id").is_some() &&
|
|
|
+ json_value.get("id").unwrap() == 12
|
|
|
+ {
|
|
|
+ let result = json_value["result"].as_array().unwrap();
|
|
|
+ let mut channel = "".to_string();
|
|
|
+ for dto in result {
|
|
|
+ let req = dto["req"].as_str().unwrap();
|
|
|
+ if req.contains("@account") {
|
|
|
+ channel += "@aggTrade";
|
|
|
+ } else if req.contains("@balance") {
|
|
|
+ channel += "@balance";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ res_data.code = -201;
|
|
|
+ res_data.message = format!("订阅成功:{}", channel)
|
|
|
+ } else if json_value.get("error").is_some() { //订阅返回
|
|
|
res_data.code = i16::from_str(json_value["error"]["code"].as_str().unwrap()).unwrap();
|
|
|
res_data.message = json_value["error"]["msg"].to_string();
|
|
|
- } else if json_value.get("stream").is_some() {//订阅返回
|
|
|
+ } else if json_value.get("stream").is_some() { //订阅返回
|
|
|
res_data.data = json_value["data"].clone();
|
|
|
res_data.code = 200;
|
|
|
|