bybit_usdt_swap.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. use std::cmp::Ordering;
  2. use std::collections::BTreeMap;
  3. use std::sync::Arc;
  4. use std::sync::atomic::AtomicBool;
  5. use std::time::Duration;
  6. use futures_util::StreamExt;
  7. use rust_decimal::Decimal;
  8. use tokio::{spawn, time};
  9. use tokio::sync::Mutex;
  10. use tracing::{error, info};
  11. use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
  12. use exchanges::response_base::ResponseData;
  13. use global::trace_stack::TraceStack;
  14. use standard::exchange::ExchangeEnum::{BybitSwap};
  15. use standard::handle_info::{DepthParam, format_depth, make_special_depth};
  16. use standard::MarketOrder;
  17. use crate::model::{OrderInfo, OriginalTradeBy};
  18. use crate::quant::Quant;
  19. use crate::exchange_disguise::on_special_depth;
  20. // 1交易、0参考 bybit 合约 启动
  21. pub async fn bybit_swap_run(bool_v1: Arc<AtomicBool>,
  22. is_trade: bool,
  23. _quant_arc: Arc<Mutex<Quant>>,
  24. name: String,
  25. symbols: Vec<String>,
  26. is_colo: bool,
  27. exchange_params: BTreeMap<String, String>) {
  28. // 启动公共频道
  29. let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
  30. let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
  31. let mut ws_public = BybitSwapWs::new_label(name.clone(), is_colo, None, BybitSwapWsType::Public);
  32. ws_public.set_symbols(symbols.clone());
  33. ws_public.set_subscribe(vec![
  34. BybitSwapSubscribeType::PuOrderBook50
  35. ]);
  36. if is_trade {
  37. ws_public.set_subscribe(vec![
  38. BybitSwapSubscribeType::PuBlicTrade
  39. ]);
  40. }
  41. // 挂起公共ws
  42. let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
  43. let bool_clone_public = Arc::clone(&bool_v1);
  44. spawn(async move {
  45. ws_public.ws_connect_async(bool_clone_public,
  46. &write_tx_am_public,
  47. write_rx_public,
  48. read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  49. });
  50. // 消费数据
  51. let bot_arc_clone = _quant_arc.clone();
  52. // 接收public数据
  53. spawn(async move {
  54. // ticker
  55. let mut update_flag_u = Decimal::ZERO;
  56. let mut max_buy = Decimal::ZERO;
  57. let mut min_sell = Decimal::ZERO;
  58. let mut depth_asks: Vec<MarketOrder> = Vec::new();
  59. let mut depth_bids: Vec<MarketOrder> = Vec::new();
  60. loop {
  61. if let Some(public_data) = read_rx_public.next().await {
  62. on_public_data(bot_arc_clone.clone(),
  63. &mut update_flag_u,
  64. &mut max_buy,
  65. &mut min_sell,
  66. public_data,
  67. &mut depth_asks,
  68. &mut depth_bids).await;
  69. }
  70. }
  71. });
  72. let trade_symbols = symbols.clone();
  73. // 交易交易所需要启动私有ws
  74. if is_trade {
  75. let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
  76. let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
  77. let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
  78. let mut ws_private = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
  79. ws_private.set_symbols(trade_symbols);
  80. ws_private.set_subscribe(vec![
  81. BybitSwapSubscribeType::PrPosition,
  82. BybitSwapSubscribeType::PrOrder,
  83. BybitSwapSubscribeType::PrWallet
  84. ]);
  85. // 挂起私有ws
  86. let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
  87. let bool_clone_private = Arc::clone(&bool_v1);
  88. spawn(async move {
  89. ws_private.ws_connect_async(bool_clone_private,
  90. &write_tx_am_private,
  91. write_rx_private,
  92. read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
  93. });
  94. // 消费数据
  95. let bot_arc_clone = _quant_arc.clone();
  96. // 接收private信息
  97. spawn(async move {
  98. let ct_val = bot_arc_clone.clone().lock().await.platform_rest.get_self_market().ct_val;
  99. let run_symbol = symbols.clone()[0].clone();
  100. loop {
  101. if let Some(private_data) = read_rx_private.next().await {
  102. on_private_data(bot_arc_clone.clone(),
  103. ct_val,
  104. private_data,
  105. run_symbol.clone()).await;
  106. }
  107. }
  108. });
  109. // 定时获取仓位信息
  110. let position_quant_clone = _quant_arc.clone();
  111. spawn(async move {
  112. let mut interval = time::interval(Duration::from_secs(30));
  113. loop {
  114. interval.tick().await;
  115. {
  116. let mut quant = position_quant_clone.lock().await;
  117. quant.update_position_rest_swap().await;
  118. }
  119. }
  120. });
  121. }
  122. }
  123. async fn on_private_data(bot_arc_clone: Arc<Mutex<Quant>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
  124. let mut trace_stack = TraceStack::default();
  125. trace_stack.on_after_network(data.time);
  126. trace_stack.on_before_quant();
  127. if data.code != "200".to_string() {
  128. return;
  129. }
  130. if data.channel == "wallet" {
  131. let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, data, run_symbol.clone());
  132. {
  133. let mut quant = bot_arc_clone.lock().await;
  134. quant.update_equity(account).await;
  135. }
  136. } else if data.channel == "order" {
  137. trace_stack.on_before_format();
  138. let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, data.clone(), ct_val.clone());
  139. trace_stack.on_after_format();
  140. let mut order_infos:Vec<OrderInfo> = Vec::new();
  141. for order in orders.order {
  142. if order.status == "NULL" {
  143. continue;
  144. }
  145. let order_info = OrderInfo {
  146. symbol: "".to_string(),
  147. amount: order.amount.abs(),
  148. side: "".to_string(),
  149. price: order.price,
  150. client_id: order.custom_id,
  151. filled_price: order.avg_price,
  152. filled: order.deal_amount.abs(),
  153. order_id: order.id,
  154. local_time: 0,
  155. create_time: 0,
  156. status: order.status,
  157. fee: Default::default(),
  158. trace_stack: Default::default(),
  159. };
  160. order_infos.push(order_info);
  161. }
  162. {
  163. let mut quant = bot_arc_clone.lock().await;
  164. quant.update_order(order_infos, trace_stack);
  165. }
  166. } else if data.channel == "position" {
  167. let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap,data, ct_val.clone());
  168. {
  169. let mut quant = bot_arc_clone.lock().await;
  170. quant.update_position(positions).await;
  171. }
  172. }
  173. }
  174. async fn on_public_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>) {
  175. let mut trace_stack = TraceStack::default();
  176. trace_stack.on_after_network(data.time);
  177. trace_stack.on_before_quant();
  178. if data.code != "200".to_string() {
  179. return;
  180. }
  181. if data.channel == "orderbook" {
  182. let mut is_update = false;
  183. let data_type = data.data_type.clone();
  184. let label = data.label.clone();
  185. if data_type == "delta" {
  186. is_update = true;
  187. }
  188. trace_stack.on_before_format();
  189. let mut depth_format: DepthParam = format_depth(BybitSwap, data);
  190. // 是增量更新
  191. if is_update {
  192. update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
  193. } else { // 全量
  194. depth_asks.clear();
  195. depth_asks.append(&mut depth_format.depth_asks);
  196. depth_bids.clear();
  197. depth_bids.append(&mut depth_format.depth_bids);
  198. }
  199. let depth = make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
  200. trace_stack.on_before_network(depth_format.create_at.clone());
  201. trace_stack.on_after_format();
  202. on_special_depth(bot_arc_clone, update_flag_u, label, trace_stack, depth).await;
  203. } else if data.channel == "trade" {
  204. let mut quant = bot_arc_clone.lock().await;
  205. let str = data.label.clone();
  206. if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
  207. *max_buy = Decimal::ZERO;
  208. *min_sell = Decimal::ZERO;
  209. quant.is_update.remove(str.as_str());
  210. }
  211. let trades: Vec<OriginalTradeBy> = serde_json::from_str(data.data.as_str()).unwrap();
  212. for trade in trades {
  213. if trade.p > *max_buy || *max_buy == Decimal::ZERO{
  214. *max_buy = trade.p
  215. }
  216. if trade.p < *min_sell || *min_sell == Decimal::ZERO{
  217. *min_sell = trade.p
  218. }
  219. }
  220. quant.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
  221. }
  222. }
  223. fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
  224. BybitSwapLogin {
  225. api_key: exchange_params.get("access_key").unwrap().clone(),
  226. secret_key: exchange_params.get("secret_key").unwrap().clone(),
  227. }
  228. }
  229. fn update_order_book(depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, asks : Vec<MarketOrder>, bids: Vec<MarketOrder>) {
  230. for i in asks {
  231. let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
  232. match index_of_value {
  233. Some(index) => {
  234. if i.amount == Decimal::ZERO {
  235. depth_asks.remove(index);
  236. } else {
  237. depth_asks[index].amount = i.amount.clone();
  238. }
  239. },
  240. None => {
  241. depth_asks.push(i.clone());
  242. },
  243. }
  244. }
  245. for i in bids {
  246. let index_of_value = depth_bids.iter().position(|x| x.price == i.price);
  247. match index_of_value {
  248. Some(index) => {
  249. if i.amount == Decimal::ZERO {
  250. depth_bids.remove(index);
  251. } else {
  252. depth_bids[index].amount = i.amount.clone();
  253. }
  254. },
  255. None => {
  256. depth_bids.push(i.clone());
  257. },
  258. }
  259. }
  260. depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
  261. depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
  262. // 限制总长度100
  263. depth_asks.truncate(100);
  264. depth_bids.truncate(100);
  265. }