|
|
@@ -1,190 +1,247 @@
|
|
|
-// use std::collections::BTreeMap;
|
|
|
-// use std::sync::Arc;
|
|
|
-// use std::sync::atomic::AtomicBool;
|
|
|
-// use rust_decimal::Decimal;
|
|
|
-// use tokio::spawn;
|
|
|
-// use tokio::sync::Mutex;
|
|
|
-// use tracing::{error, info};
|
|
|
-// use tokio_tungstenite::tungstenite::Message;
|
|
|
-// use exchanges::bitget_swap_ws::{BitgetSwapLogin, BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
|
|
|
-// use exchanges::response_base::ResponseData;
|
|
|
-// use global::trace_stack::TraceStack;
|
|
|
-// use standard::exchange::ExchangeEnum::{BitgetSwap};
|
|
|
-// use standard::{Position, PositionModeEnum};
|
|
|
-// use crate::core::Core;
|
|
|
-// use crate::exchange_disguise::on_special_depth;
|
|
|
-// use crate::model::OrderInfo;
|
|
|
-//
|
|
|
-// pub async fn bitget_usdt_swap_run(is_shutdown_arc :Arc<AtomicBool>,
|
|
|
-// is_trade: bool,
|
|
|
-// core_arc: Arc<Mutex<Core>>,
|
|
|
-// name: String,
|
|
|
-// symbols: Vec<String>,
|
|
|
-// is_colo: bool,
|
|
|
-// exchange_params: BTreeMap<String, String>) {
|
|
|
-// // 开启公共频道
|
|
|
-// let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded::<Message>();
|
|
|
-//
|
|
|
-// // 开启公共连接
|
|
|
-// let is_shutdown_arc_c1 = is_shutdown_arc.clone();
|
|
|
-// let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
|
|
|
-// let name_clone = name.clone();
|
|
|
-// let core_arc_clone = core_arc.clone();
|
|
|
-// let symbols_clone = symbols.clone();
|
|
|
-// spawn(async move {
|
|
|
-// // 构建链接ws
|
|
|
-// let mut bg_public = BitgetSwapWs::new_label(name_clone.clone(),
|
|
|
-// is_colo,
|
|
|
-// None,
|
|
|
-// BitgetSwapWsType::Public);
|
|
|
-//
|
|
|
-// // 消费数据的函数
|
|
|
-// let mut update_flag_u = Decimal::ZERO;
|
|
|
-// let fun = move |data: ResponseData| {
|
|
|
-// let core_arc_cc = core_arc_clone.clone();
|
|
|
-//
|
|
|
-// async move {
|
|
|
-// on_public_data(core_arc_cc, &mut update_flag_u, data).await
|
|
|
-// }
|
|
|
-// };
|
|
|
-//
|
|
|
-// // 准备链接
|
|
|
-// bg_public.set_subscribe(vec![BitgetSwapSubscribeType::PuBooks1]); // 只用订阅深度数据
|
|
|
-// bg_public.set_symbols(symbols_clone);
|
|
|
-// // bg_public.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_public, write_rx_public).await.expect("bitget_usdt_swap 链接有异常")
|
|
|
-// });
|
|
|
-//
|
|
|
-// // 不需要交易就不用开启私有频道了
|
|
|
-// if !is_trade {
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// // 开启私有频道
|
|
|
-// let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
|
|
|
-//
|
|
|
-// // 开启公共连接
|
|
|
-// let is_shutdown_arc_c1 = is_shutdown_arc.clone();
|
|
|
-// let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
|
|
|
-// spawn(async move {
|
|
|
-// // 构建链接ws
|
|
|
-// let mut bg_private = BitgetSwapWs::new_label(name.clone(),
|
|
|
-// is_colo,
|
|
|
-// Some(parse_btree_map_to_bitget_swap_login(exchange_params)),
|
|
|
-// BitgetSwapWsType::Private);
|
|
|
-//
|
|
|
-// // 消费数据的函数
|
|
|
-// let core_arc_clone = core_arc.clone();
|
|
|
-// let run_symbol = symbols[0].clone();
|
|
|
-// let ct_val = core_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
|
|
|
-// let fun = move |data: ResponseData| {
|
|
|
-// let core_arc_cc = core_arc_clone.clone();
|
|
|
-// let run_symbol_c = run_symbol.clone();
|
|
|
-//
|
|
|
-// async move {
|
|
|
-// on_private_data(core_arc_cc, ct_val, data, &run_symbol_c).await
|
|
|
-// }
|
|
|
-// };
|
|
|
-//
|
|
|
-// // 准备链接
|
|
|
-// bg_private.set_subscribe(vec![
|
|
|
-// BitgetSwapSubscribeType::PrOrders,
|
|
|
-// BitgetSwapSubscribeType::PrAccount,
|
|
|
-// BitgetSwapSubscribeType::PrPosition
|
|
|
-// ]);
|
|
|
-// bg_private.set_symbols(symbols.clone());
|
|
|
-// bg_private.ws_connect_async(is_shutdown_arc_c1, fun, &write_tx_am_private, write_rx_private).await.expect("bitget_usdt_swap 链接有异常")
|
|
|
-// });
|
|
|
-// }
|
|
|
-//
|
|
|
-// async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>,
|
|
|
-// ct_val: Decimal,
|
|
|
-// response: ResponseData,
|
|
|
-// run_symbol: &String) {
|
|
|
-// let mut trace_stack = TraceStack::new(response.time, response.ins);
|
|
|
-// trace_stack.on_after_span_line();
|
|
|
-//
|
|
|
-// // public类型,目前只考虑订单流数据
|
|
|
-// match response.channel.as_str() {
|
|
|
-// // "account" => {
|
|
|
-// // let account = standard::handle_info::HandleSwapInfo::handle_account_info(BitgetSwap, &response, run_symbol);
|
|
|
-// // let mut core = core_arc_clone.lock().await;
|
|
|
-// // core.update_equity(account).await;
|
|
|
-// // },
|
|
|
-// // "positions" => {
|
|
|
-// // let mut positions = standard::handle_info::HandleSwapInfo::handle_position(BitgetSwap, &response, &ct_val);
|
|
|
-// //
|
|
|
-// // // bitget如果没有仓位不会给0,会给个空数组
|
|
|
-// // if positions.is_empty() {
|
|
|
-// // positions.push(Position {
|
|
|
-// // symbol: run_symbol.replace("_", "").to_uppercase(),
|
|
|
-// // margin_level: Default::default(),
|
|
|
-// // amount: Default::default(),
|
|
|
-// // frozen_amount: Default::default(),
|
|
|
-// // price: Default::default(),
|
|
|
-// // profit: Default::default(),
|
|
|
-// // position_mode: PositionModeEnum::Both,
|
|
|
-// // margin: Default::default(),
|
|
|
-// // });
|
|
|
-// // }
|
|
|
-// //
|
|
|
-// // let mut core = core_arc_clone.lock().await;
|
|
|
-// // core.update_position(positions).await;
|
|
|
-// // },
|
|
|
-// // "orders" => {
|
|
|
-// // trace_stack.set_source("gate_swap.orders".to_string());
|
|
|
-// // let orders = standard::handle_info::HandleSwapInfo::handle_order(BitgetSwap, response.clone(), ct_val.clone());
|
|
|
-// //
|
|
|
-// // let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
-// // for mut order in orders.order {
|
|
|
-// // if order.status == "NULL" {
|
|
|
-// // error!("bitget_usdt_swap 未识别的订单状态:{:?}", response);
|
|
|
-// //
|
|
|
-// // continue;
|
|
|
-// // }
|
|
|
-// //
|
|
|
-// // let order_info = OrderInfo::parse_order_to_order_info(&mut order);
|
|
|
-// // order_infos.push(order_info);
|
|
|
-// // }
|
|
|
-// //
|
|
|
-// // {
|
|
|
-// // let mut core = core_arc_clone.lock().await;
|
|
|
-// // core.update_order(order_infos, trace_stack).await;
|
|
|
-// // }
|
|
|
-// // },
|
|
|
-// "pong" => {}
|
|
|
-// _ => {
|
|
|
-// info!("bitget_usdt_swap 113 未知的订阅数据: {:?}", response);
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// async fn on_public_data(_core_arc_clone: Arc<Mutex<Core>>,
|
|
|
-// _update_flag_u: &mut Decimal,
|
|
|
-// response: ResponseData) {
|
|
|
-// let mut trace_stack = TraceStack::new(response.time, response.ins);
|
|
|
-// trace_stack.on_after_span_line();
|
|
|
-//
|
|
|
-// // public类型,目前只考虑订单流数据
|
|
|
-// match response.channel.as_str() {
|
|
|
-// "books1" => {
|
|
|
-// // trace_stack.set_source("bitget_usdt_swap.books1".to_string());
|
|
|
-// // let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSwap, &response);
|
|
|
-// // trace_stack.on_after_format();
|
|
|
-// //
|
|
|
-// // on_special_depth(core_arc_clone, update_flag_u, &response.label, &mut trace_stack, &special_depth).await;
|
|
|
-// },
|
|
|
-// "pong" => {},
|
|
|
-// _ => {
|
|
|
-// info!("bitget_usdt_swap 125 未知的订阅数据");
|
|
|
-// info!(?response)
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// fn parse_btree_map_to_bitget_swap_login(exchange_params: BTreeMap<String, String>) -> BitgetSwapLogin {
|
|
|
-// BitgetSwapLogin {
|
|
|
-// api_key: exchange_params.get("access_key").unwrap().clone(),
|
|
|
-// secret_key: exchange_params.get("secret_key").unwrap().clone(),
|
|
|
-// passphrase_key: exchange_params.get("pass_key").unwrap().clone(),
|
|
|
-// }
|
|
|
-// }
|
|
|
+use std::collections::BTreeMap;
|
|
|
+use std::sync::Arc;
|
|
|
+use std::sync::atomic::AtomicBool;
|
|
|
+use rust_decimal::Decimal;
|
|
|
+use tokio::spawn;
|
|
|
+use tokio::sync::Mutex;
|
|
|
+use tracing::{error, info};
|
|
|
+use tokio_tungstenite::tungstenite::Message;
|
|
|
+use exchanges::bitget_swap_ws::{BitgetSwapLogin, BitgetSwapSubscribeType, BitgetSwapWs, BitgetSwapWsType};
|
|
|
+use exchanges::response_base::ResponseData;
|
|
|
+use global::trace_stack::TraceStack;
|
|
|
+use standard::exchange::ExchangeEnum::BitgetSwap;
|
|
|
+use standard::exchange_struct_handler::ExchangeStructHandler;
|
|
|
+use standard::{Depth, OrderBook};
|
|
|
+use crate::core::Core;
|
|
|
+use crate::exchange_disguise::{on_depth, on_record, on_ticker, on_trade};
|
|
|
+use crate::model::OrderInfo;
|
|
|
+
|
|
|
+pub async fn reference_bitget_swap_run(is_shutdown_arc: Arc<AtomicBool>,
|
|
|
+ core_arc: Arc<Mutex<Core>>,
|
|
|
+ name: String,
|
|
|
+ symbols: Vec<String>,
|
|
|
+ is_colo: bool) {
|
|
|
+ spawn(async move {
|
|
|
+ // 开启公共频道
|
|
|
+ let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
|
|
|
+ let mut ws = BitgetSwapWs::new_label(name, is_colo, None, BitgetSwapWsType::Public);
|
|
|
+ ws.set_subscribe(vec![
|
|
|
+ BitgetSwapSubscribeType::PuBooks1,
|
|
|
+ BitgetSwapSubscribeType::PuTrade,
|
|
|
+ BitgetSwapSubscribeType::PuKline("1".to_string()),
|
|
|
+ ]);
|
|
|
+
|
|
|
+
|
|
|
+ // 读取数据
|
|
|
+ let core_arc_clone = Arc::clone(&core_arc);
|
|
|
+ let mut rest = core_arc_clone.lock().await.platform_rest.clone_box();
|
|
|
+ let multiplier = rest.get_self_market().multiplier;
|
|
|
+ let mut records = rest.get_record("1".to_string()).await.unwrap();
|
|
|
+ for record in records.iter_mut() {
|
|
|
+ let core_arc_clone = core_arc.clone();
|
|
|
+
|
|
|
+ on_record(core_arc_clone, record).await
|
|
|
+ }
|
|
|
+
|
|
|
+ let depth_asks = Arc::new(Mutex::new(Vec::new()));
|
|
|
+ let depth_bids = Arc::new(Mutex::new(Vec::new()));
|
|
|
+
|
|
|
+ let fun = move |data: ResponseData| {
|
|
|
+ // 在 async 块之前克隆 Arc
|
|
|
+ let core_arc_cc = core_arc_clone.clone();
|
|
|
+ let mul = multiplier.clone();
|
|
|
+
|
|
|
+ let depth_asks = Arc::clone(&depth_asks);
|
|
|
+ let depth_bids = Arc::clone(&depth_bids);
|
|
|
+
|
|
|
+ async move {
|
|
|
+ let mut depth_asks = depth_asks.lock().await;
|
|
|
+ let mut depth_bids = depth_bids.lock().await;
|
|
|
+ // 使用克隆后的 Arc,避免 move 语义
|
|
|
+ on_public_data(core_arc_cc, &mul, &data, &mut depth_asks, &mut depth_bids).await
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // 链接
|
|
|
+ let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
+ ws.set_symbols(symbols);
|
|
|
+ ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("bitget_usdt_swap 链接有异常");
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+// 交易 bybit 合约 启动
|
|
|
+pub(crate) async fn bitget_swap_run(is_shutdown_arc: Arc<AtomicBool>,
|
|
|
+ core_arc: Arc<Mutex<Core>>,
|
|
|
+ name: String,
|
|
|
+ symbols: Vec<String>,
|
|
|
+ is_colo: bool,
|
|
|
+ exchange_params: BTreeMap<String, String>) {
|
|
|
+ spawn(async move {
|
|
|
+ let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
|
|
|
+ let auth = Some(parse_btree_map_to_bitget_swap_login(exchange_params));
|
|
|
+ let mut ws = BitgetSwapWs::new_label(name.clone(), is_colo, auth, BitgetSwapWsType::Private);
|
|
|
+ ws.set_subscribe(vec![
|
|
|
+ BitgetSwapSubscribeType::PrOrders,
|
|
|
+ BitgetSwapSubscribeType::PrAccount,
|
|
|
+ BitgetSwapSubscribeType::PrPosition,
|
|
|
+ ]);
|
|
|
+
|
|
|
+ let core_arc_clone_private = core_arc.clone();
|
|
|
+ let multiplier = core_arc_clone_private.lock().await.platform_rest.get_self_market().multiplier;
|
|
|
+ let run_symbol = symbols.clone()[0].clone();
|
|
|
+
|
|
|
+ // 挂起私有ws
|
|
|
+ let fun = move |data: ResponseData| {
|
|
|
+ // 在 async 块之前克隆 Arc
|
|
|
+ let core_arc_cc = core_arc_clone_private.clone();
|
|
|
+ let mul = multiplier.clone();
|
|
|
+ let rs = run_symbol.clone();
|
|
|
+
|
|
|
+ async move {
|
|
|
+ // 使用克隆后的 Arc,避免 move 语义
|
|
|
+ on_private_data(core_arc_cc, &mul, &rs, &data).await;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // 链接
|
|
|
+ let write_tx_am = Arc::new(Mutex::new(write_tx));
|
|
|
+ ws.set_symbols(symbols);
|
|
|
+ ws.ws_connect_async(is_shutdown_arc, fun, &write_tx_am, write_rx).await.expect("bitget_usdt_swap 链接有异常");
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+async fn on_private_data(core_arc_clone: Arc<Mutex<Core>>, ct_val: &Decimal, run_symbol: &String, response: &ResponseData) {
|
|
|
+ let mut trace_stack = TraceStack::new(response.time, response.ins);
|
|
|
+ trace_stack.on_after_span_line();
|
|
|
+
|
|
|
+ match response.channel.as_str() {
|
|
|
+ "wallet" => {
|
|
|
+ let account = ExchangeStructHandler::account_info_handle(BitgetSwap, response, run_symbol);
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+ core.update_equity(account).await;
|
|
|
+ }
|
|
|
+ "order" => {
|
|
|
+ let orders = ExchangeStructHandler::order_handle(BitgetSwap, response, ct_val);
|
|
|
+ trace_stack.on_after_format();
|
|
|
+
|
|
|
+ let mut order_infos: Vec<OrderInfo> = Vec::new();
|
|
|
+ for mut order in orders.order {
|
|
|
+ if order.status == "NULL" {
|
|
|
+ error!("bitget_usdt_swap 未识别的订单状态:{:?}", response);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if order.deal_amount != Decimal::ZERO {
|
|
|
+ info!("bitget order 消息原文:{:?}", response);
|
|
|
+ }
|
|
|
+
|
|
|
+ let order_info = OrderInfo::parse_order_to_order_info(&mut order);
|
|
|
+ order_infos.push(order_info);
|
|
|
+ }
|
|
|
+
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+ core.update_order(order_infos, trace_stack).await;
|
|
|
+ }
|
|
|
+ "position" => {
|
|
|
+ let positions = ExchangeStructHandler::position_handle(BitgetSwap, response, ct_val);
|
|
|
+ let mut core = core_arc_clone.lock().await;
|
|
|
+ core.update_position(positions).await;
|
|
|
+ }
|
|
|
+ _ => {
|
|
|
+ error!("未知推送类型");
|
|
|
+ error!(?response);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+async fn on_public_data(core_arc: Arc<Mutex<Core>>, mul: &Decimal, response: &ResponseData, depth_asks: &mut Vec<OrderBook>, depth_bids: &mut Vec<OrderBook>) {
|
|
|
+ let mut trace_stack = TraceStack::new(response.time, response.ins);
|
|
|
+ trace_stack.on_after_span_line();
|
|
|
+
|
|
|
+ match response.channel.as_str() {
|
|
|
+ "orderbook" => {
|
|
|
+ trace_stack.set_source("bitget_usdt_swap.bookTicker".to_string());
|
|
|
+
|
|
|
+ let mut is_update = false;
|
|
|
+ if response.data_type == "delta" {
|
|
|
+ is_update = true;
|
|
|
+ }
|
|
|
+ let mut depth = ExchangeStructHandler::book_ticker_handle(BitgetSwap, &response, mul);
|
|
|
+ // 是增量更新
|
|
|
+ if is_update {
|
|
|
+ if depth.asks.len() != 0 {
|
|
|
+ depth_asks.clear();
|
|
|
+ depth_asks.append(&mut depth.asks);
|
|
|
+ } else if depth.bids.len() != 0 {
|
|
|
+ depth_bids.clear();
|
|
|
+ depth_bids.append(&mut depth.bids);
|
|
|
+ }
|
|
|
+
|
|
|
+ let result_depth = Depth {
|
|
|
+ time: depth.time,
|
|
|
+ symbol: depth.symbol,
|
|
|
+ asks: depth_asks.clone(),
|
|
|
+ bids: depth_bids.clone(),
|
|
|
+ };
|
|
|
+
|
|
|
+ trace_stack.on_after_format();
|
|
|
+ on_depth(core_arc, &response.label, &mut trace_stack, &result_depth).await;
|
|
|
+ }
|
|
|
+ // 全量
|
|
|
+ else {
|
|
|
+ trace_stack.on_after_format();
|
|
|
+ on_depth(core_arc, &response.label, &mut trace_stack, &depth).await;
|
|
|
+
|
|
|
+ depth_asks.clear();
|
|
|
+ depth_asks.append(&mut depth.asks);
|
|
|
+ depth_bids.clear();
|
|
|
+ depth_bids.append(&mut depth.bids);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ "trade" => {
|
|
|
+ trace_stack.set_source("bitget_usdt_swap.trade".to_string());
|
|
|
+
|
|
|
+ let mut trades = ExchangeStructHandler::trades_handle(BitgetSwap, response, mul);
|
|
|
+ trace_stack.on_after_format();
|
|
|
+
|
|
|
+ for trade in trades.iter_mut() {
|
|
|
+ let core_arc_clone = core_arc.clone();
|
|
|
+
|
|
|
+ on_trade(core_arc_clone, &response.label, &mut trace_stack, &trade).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ "tickers" => {
|
|
|
+ trace_stack.set_source("bitget_usdt_swap.tickers".to_string());
|
|
|
+ let ticker = ExchangeStructHandler::ticker_handle(BitgetSwap, response).await;
|
|
|
+ trace_stack.on_after_format();
|
|
|
+
|
|
|
+ on_ticker(core_arc, &mut trace_stack, &ticker).await;
|
|
|
+ }
|
|
|
+ // k线数据
|
|
|
+ "kline" => {
|
|
|
+ let mut records = ExchangeStructHandler::records_handle(BitgetSwap, &response);
|
|
|
+
|
|
|
+ if records.is_empty() {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ for record in records.iter_mut() {
|
|
|
+ let core_arc_clone = core_arc.clone();
|
|
|
+
|
|
|
+ on_record(core_arc_clone, record).await
|
|
|
+ }
|
|
|
+ }
|
|
|
+ _ => {
|
|
|
+ error!("未知推送类型");
|
|
|
+ error!(?response);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+fn parse_btree_map_to_bitget_swap_login(exchange_params: BTreeMap<String, String>) -> BitgetSwapLogin {
|
|
|
+ BitgetSwapLogin {
|
|
|
+ api_key: exchange_params.get("access_key").unwrap().clone(),
|
|
|
+ secret_key: exchange_params.get("secret_key").unwrap().clone(),
|
|
|
+ passphrase_key: exchange_params.get("pass_key").unwrap().clone(),
|
|
|
+ }
|
|
|
+}
|