Browse Source

参考、交易交易所提出

JiahengHe 2 years ago
parent
commit
3df58ebe12
4 changed files with 263 additions and 217 deletions
  1. 6 4
      src/main.rs
  2. 250 0
      strategy/src/exchange_disguise.rs
  3. 2 1
      strategy/src/lib.rs
  4. 5 212
      strategy/src/quant.rs

+ 6 - 4
src/main.rs

@@ -7,10 +7,12 @@ use std::sync::Arc;
 use tokio::sync::{mpsc, Mutex};
 use tokio::try_join;
 use tracing::{error, info};
+use exchanges::gate_swap_rest::GateSwapRest;
+use standard::exchange::ExchangeEnum::GateSwap;
 use standard::Order;
 use strategy::model::OrderInfo;
 use strategy::params::Params;
-use strategy::quant;
+use strategy::{exchange_disguise, quant};
 use strategy::quant::Quant;
 
 #[tokio::main]
@@ -32,7 +34,7 @@ async fn main() {
     let (order_sender, mut order_receiver) = mpsc::channel::<Order>(100);
     let (error_sender, mut error_receiver) = mpsc::channel::<Error>(100);
 
-    let mut quant_obj = Quant::new(params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone()).await;
+    let mut quant_obj = Quant::new(GateSwap, params.clone(), exchange_params.clone(), order_sender.clone(), error_sender.clone()).await;
     let trade_name = quant_obj.trade_name.clone();
     let mut quant_arc = Arc::new(Mutex::new(quant_obj));
 
@@ -40,9 +42,9 @@ async fn main() {
     quant_arc.lock().await.before_trade().await;
     let ref_name = quant_arc.lock().await.ref_name[0].clone();
     // 参考交易所
-    quant::run_refer(quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
+    exchange_disguise::run_reference_exchange("binance".to_string(), quant_arc.clone(), ref_name, params.ref_pair.clone(), exchange_params.clone()).await;
     // 交易交易所
-    quant::run_transaction(quant_arc.clone(), trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
+    exchange_disguise::run_transactional_exchange("gate".to_string(), quant_arc.clone(), trade_name, vec![params.pair.clone()], exchange_params.clone()).await;
     // 启动定期触发的系统逻辑
     quant::on_timer(quant_arc.clone());
     // 启动策略逻辑

+ 250 - 0
strategy/src/exchange_disguise.rs

@@ -0,0 +1,250 @@
+use std::collections::BTreeMap;
+use std::str::FromStr;
+use std::sync::Arc;
+use std::time::Duration;
+use rust_decimal::Decimal;
+use serde_json::Value;
+use tokio::sync::mpsc::{channel};
+use tokio::sync::Mutex;
+use tokio::time::sleep;
+use tracing::{error, info};
+use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsType};
+use exchanges::gate_swap_rest::GateSwapRest;
+use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
+use standard::exchange::ExchangeEnum::{BinanceSwap, GateSwap};
+use standard::SpecialTicker;
+use crate::model::{OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa};
+use crate::quant::Quant;
+
+// 交易交易所启动
+pub async fn run_transactional_exchange(exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    match exchange_name.as_str() {
+        "gate" => {
+            transactional_gate_run(quant_arc, name, symbols, exchange_params).await;
+        }
+        _ => {
+            panic!("参数错误!")
+        }
+    }
+}
+
+// 参考交易所启动
+pub async fn run_reference_exchange(exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    match exchange_name.as_str() {
+        "binance" => {
+            reference_binance_run(quant_arc, name, symbols, exchange_params).await;
+        }
+        _ => {
+            panic!("参数错误!")
+        }
+    }
+}
+
+// 交易 gate 启动
+async fn transactional_gate_run(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    let (tx, mut rx) = channel(100);
+    let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
+    // 获取user_id
+    let res_data = gate_exc.wallet_fee().await;
+    assert_eq!(res_data.code, "200", "获取gate交易所参数 user_id 失败, 启动失败!");
+
+    let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
+    info!(?wallet_obj);
+    let user_id = wallet_obj["user_id"].to_string();
+    let symbols_one = symbols.clone();
+    // 获取乘数(计价货币兑换为结算货币的乘数)
+    let response = gate_exc.get_market_details("usdt".to_string()).await;
+    assert_eq!(response.code, "200", "获取gate交易所参数 multiplier 失败, 启动失败!");
+
+    let contract_obj :Vec<Value> = parse_json_array(&response.data).unwrap();
+    let mut multiplier =  Decimal::ZERO;
+    for val in contract_obj {
+        if symbols[0].to_uppercase() == val["name"].as_str().unwrap(){
+            let num = val["quanto_multiplier"].as_str().unwrap();
+            multiplier = Decimal::from_str(num).unwrap();
+        }
+    }
+    assert_ne!(multiplier, Decimal::ZERO, "获取gate交易所参数 multiplier 为0!");
+
+    tokio::spawn( async move {
+        let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
+                                                 GateWsType::PublicAndPrivate("usdt".to_string()), tx);
+        gate_exc.set_subscribe(vec![
+            GateSubscribeType::PuFuturesTrades,
+            GateSubscribeType::PuFuturesOrderBook,
+            GateSubscribeType::PrFuturesOrders(user_id.clone()),
+            GateSubscribeType::PrFuturesPositions(user_id.clone()),
+            GateSubscribeType::PrFuturesBalances(user_id.clone()),
+        ]);
+        gate_exc.custom_subscribe(symbols_one).await;
+    });
+    tokio::spawn(async move {
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        let run_symbol = symbols.clone()[0].clone();
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            match rx.recv().await {
+                Some(data) => {
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "futures.order_book" {
+                        let depth = standard::gate_handle::handle_special_depth(data);
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                            quant._update_depth(depth.depth.clone(), depth.name.clone());
+                            quant.local_depths.insert(depth.name, depth.depth);
+                        }
+                    } else if data.channel == "futures.balances" {
+                        let account = standard::handle_info::HandleSwapInfo::handle_account_info(GateSwap, data, run_symbol.clone());
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_equity(account);
+                        }
+                    } else if data.channel == "futures.orders" {
+                        let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
+                        let mut order_infos:Vec<OrderInfo> = Vec::new();
+                        for order in orders.order {
+                            let order_info = OrderInfo {
+                                symbol: "".to_string(),
+                                amount: order.amount.abs(),
+                                side: "".to_string(),
+                                price: order.price,
+                                client_id: order.custom_id,
+                                filled_price: order.avg_price,
+                                filled: order.deal_amount.abs(),
+                                order_id: order.id,
+                                local_time: 0,
+                                create_time: 0,
+                                status: order.status,
+                                fee: Default::default(),
+                            };
+                            order_infos.push(order_info);
+                        }
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_order(order_infos);
+                        }
+                    } else if data.channel == "futures.positions" {
+                        let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant.update_position(positions);
+                        }
+                    } else if data.channel == "futures.trades" {
+                        let mut quant = bot_arc_clone.lock().await;
+                        let str = data.label.clone();
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
+                        for trade in trades {
+                            if trade.price > max_buy || max_buy == Decimal::ZERO{
+                                max_buy = trade.price
+                            }
+                            if trade.price < min_sell || min_sell == Decimal::ZERO{
+                                min_sell = trade.price
+                            }
+                        }
+                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                    }
+                },
+                None => {
+                    error!("交易交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
+}
+
+// 参考 币安 启动
+async fn reference_binance_run(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+    let (tx, mut rx) = channel(100);
+    tokio::spawn( async move {
+        let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_params, BinanceWsType::PublicAndPrivate, tx);
+        ba_exc.set_subscribe(vec![
+            BinanceSubscribeType::PuBookTicker,
+            BinanceSubscribeType::PuAggTrade
+        ]);
+        ba_exc.custom_subscribe(symbols.clone()).await;
+    });
+    tokio::spawn(async move {
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        // ticker
+        let mut update_flag_u = 0i64;
+        let bot_arc_clone = Arc::clone(&quant_arc);
+        loop {
+            sleep(Duration::from_millis(1)).await;
+            match rx.recv().await {
+                Some(data) => {
+                    if data.code != "200".to_string() {
+                        continue;
+                    }
+                    if data.channel == "aggTrade" {
+                        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
+                        let str = data.label.clone();
+                        let mut quant = bot_arc_clone.lock().await;
+                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
+                            max_buy = Decimal::ZERO;
+                            min_sell = Decimal::ZERO;
+                            quant.is_update.remove(str.as_str());
+                        }
+                        if trade.p > max_buy || max_buy == Decimal::ZERO{
+                            max_buy = trade.p
+                        }
+                        if trade.p < min_sell || min_sell == Decimal::ZERO{
+                            min_sell = trade.p
+                        }
+                        {
+                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
+                        }
+                    } else if data.channel == "bookTicker" {
+                        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
+                        if ticker.u > update_flag_u {
+                            {
+                                let mut quant = bot_arc_clone.lock().await;
+                                quant._update_ticker(SpecialTicker{
+                                    sell: ticker.a.clone(),
+                                    buy: ticker.b.clone(),
+                                    mid_price: Default::default(),
+                                }, data.label.clone());
+                                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
+                                quant._update_depth(depth.clone(), data.label.clone());
+                                quant.local_depths.insert(data.label.clone(), depth);
+                            }
+                        } else {
+                            update_flag_u = ticker.u;
+                        }
+                    } else if data.channel == "depth" {
+                        // TODO: self._check_update_u(msg['u']):
+                        let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, data);
+                        {
+                            let mut quant = bot_arc_clone.lock().await;
+                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
+                            quant._update_depth(depth.depth.clone(), depth.name.clone());
+                            quant.local_depths.insert(depth.name, depth.depth);
+                        }
+                    }
+                },
+                None => {
+                    error!("参考交易所通道错误");
+                    break;
+                }
+            }
+        }
+    });
+}
+
+
+fn parse_json_array(json: &str) -> serde_json::Result<Vec<Value>> {
+    serde_json::from_str(json)
+}

+ 2 - 1
strategy/src/lib.rs

@@ -3,4 +3,5 @@ pub mod quant;
 pub mod model;
 mod strategy;
 mod predictor;
-mod utils;
+mod utils;
+pub mod exchange_disguise;

+ 5 - 212
strategy/src/quant.rs

@@ -19,8 +19,8 @@ use exchanges::gate_swap_rest::GateSwapRest;
 use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
 use global::public_params::{ASK_PRICE_INDEX, BID_PRICE_INDEX, LENGTH};
 use standard::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, SpecialDepth, SpecialTicker, Ticker};
-use standard::exchange::Exchange;
-use standard::exchange::ExchangeEnum::GateSwap;
+use standard::exchange::{Exchange, ExchangeEnum};
+use standard::exchange::ExchangeEnum::{BinanceSwap, GateSwap};
 
 use crate::model::{LocalPosition, OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa, TraderMsg};
 use crate::params::Params;
@@ -110,7 +110,7 @@ struct MarketData{
 }
 
 impl Quant {
-    pub async fn new(params: Params, exchange_params: BTreeMap<String, String>,  order_sender: Sender<Order>, error_sender: Sender<Error>) -> Quant {
+    pub async fn new(exchange_rest: ExchangeEnum, params: Params, exchange_params: BTreeMap<String, String>,  order_sender: Sender<Order>, error_sender: Sender<Error>) -> Quant {
         let symbol = params.pair.clone();
         let pairs: Vec<&str> = params.pair.split('_').collect();
         let mut quant_obj = Quant {
@@ -188,7 +188,7 @@ impl Quant {
                 max_notional: Default::default(),
                 ct_val: Default::default(),
             },
-            platform_rest: Exchange::new(GateSwap, symbol, false, exchange_params, order_sender, error_sender).await,
+            platform_rest: Exchange::new(exchange_rest, symbol, false, exchange_params, order_sender, error_sender).await,
             max_buy_min_sell_cache: Default::default(),
             local_depths: Default::default(),
             is_update: Default::default()
@@ -251,6 +251,7 @@ impl Quant {
     pub async fn handle_signals(quant_arc: Arc<Mutex<Quant>>, mut rx: Receiver<Order>) {
         tokio::spawn(async move{
             loop {
+                sleep(Duration::from_millis(1)).await;
                 match rx.try_recv() {
                     Ok(val)=>{
                         // 只处理这两种订单回执
@@ -1130,214 +1131,6 @@ pub fn run_strategy(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()>{
         }
     });
 }
-pub async fn run_transaction(quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>) {
-    let (tx, mut rx) = channel(100);
-    let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
-    // 获取user_id
-    let res_data = gate_exc.wallet_fee().await;
-    if res_data.code != "200"{
-        error!("获取gate交易所参数 user_id 失败, 交易交易所启动失败!");
-        info!(?res_data);
-        return;
-    }
-    let wallet_obj :Value = serde_json::from_str(&res_data.data).unwrap();
-    info!(?wallet_obj);
-    let user_id = wallet_obj["user_id"].to_string();
-    let symbols_one = symbols.clone();
-    // 获取乘数(计价货币兑换为结算货币的乘数)
-    let response = gate_exc.get_market_details("usdt".to_string()).await;
-    if(response.code != "200"){
-        error!("获取gate交易所参数 multiplier 失败, 交易交易所启动失败!");
-        return;
-    }
-    let contract_obj :Vec<Value> = parse_json_array(&response.data).unwrap();
-    let mut multiplier =  Decimal::ZERO;
-    for val in contract_obj {
-        if symbols[0].to_uppercase() == val["name"].as_str().unwrap(){
-            let num = val["quanto_multiplier"].as_str().unwrap();
-            multiplier = Decimal::from_str(num).unwrap();
-        }
-    }
-    if multiplier == Decimal::ZERO {
-        error!("获取gate交易所参数 multiplier 为0!");
-        return;
-    }
-    tokio::spawn( async move {
-        let mut gate_exc = GateSwapWs::new_label(name, false, exchange_params,
-                                               GateWsType::PublicAndPrivate("usdt".to_string()), tx);
-        gate_exc.set_subscribe(vec![
-            GateSubscribeType::PuFuturesTrades,
-            GateSubscribeType::PuFuturesOrderBook,
-            GateSubscribeType::PrFuturesOrders(user_id.clone()),
-            GateSubscribeType::PrFuturesPositions(user_id.clone()),
-            GateSubscribeType::PrFuturesBalances(user_id.clone()),
-        ]);
-        gate_exc.custom_subscribe(symbols_one.clone()).await;
-    });
-
-    tokio::spawn(async move {
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        let run_symbol = symbols.clone()[0].clone();
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        loop {
-            match rx.recv().await {
-                Some(data) => {
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "futures.order_book" {
-                        let depth = standard::gate_handle::handle_special_depth(data);
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                            quant._update_depth(depth.depth.clone(), depth.name.clone());
-                            quant.local_depths.insert(depth.name, depth.depth);
-                        }
-                    } else if data.channel == "futures.balances" {
-                        let account = standard::gate_handle::handle_account_info(data, run_symbol.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_equity(account);
-                        }
-                    } else if data.channel == "futures.orders" {
-                        let orders = standard::gate_handle::handle_order(data.clone(), multiplier.clone());
-                        let mut order_infos:Vec<OrderInfo> = Vec::new();
-                        for order in orders.order {
-                            let mut order_info = OrderInfo {
-                                symbol: "".to_string(),
-                                amount: order.amount.abs(),
-                                side: "".to_string(),
-                                price: order.price,
-                                client_id: order.custom_id,
-                                filled_price: order.avg_price,
-                                filled: order.deal_amount.abs(),
-                                order_id: order.id,
-                                local_time: 0,
-                                create_time: 0,
-                                status: order.status,
-                                fee: Default::default(),
-                            };
-                            order_infos.push(order_info);
-                        }
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_order(order_infos);
-                        }
-                    } else if data.channel == "futures.positions" {
-                        let positions = standard::gate_handle::handle_position(data, multiplier.clone());
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant.update_position(positions);
-                        }
-                    } else if data.channel == "futures.trades" {
-                        let mut quant = bot_arc_clone.lock().await;
-                        let str = data.label.clone();
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-                        for trade in trades {
-                            if trade.price > max_buy || max_buy == Decimal::ZERO{
-                                max_buy = trade.price
-                            }
-                            if trade.price < min_sell || min_sell == Decimal::ZERO{
-                                min_sell = trade.price
-                            }
-                        }
-                        quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                    }
-                },
-                None => {
-                    error!("交易交易所通道错误");
-                    break;
-                }
-            }
-        }
-    });
-}
-
-// 启动参考交易所
-pub async fn run_refer(quant_arc: Arc<Mutex<Quant>>, name: String, symbol: Vec<String>, exchange_param: BTreeMap<String, String>) {
-    let (tx, mut rx) = channel(100);
-    tokio::spawn( async move {
-        let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_param, BinanceWsType::PublicAndPrivate, tx);
-        ba_exc.set_subscribe(vec![
-            BinanceSubscribeType::PuBookTicker,
-            BinanceSubscribeType::PuAggTrade
-        ]);
-        ba_exc.custom_subscribe(symbol.clone()).await;
-    });
-    tokio::spawn(async move {
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        // ticker
-        let mut update_flag_u = 0i64;
-        let bot_arc_clone = Arc::clone(&quant_arc);
-        loop {
-            match rx.recv().await {
-                Some(data) => {
-                    if data.code != "200".to_string() {
-                        continue;
-                    }
-                    if data.channel == "aggTrade" {
-                        let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-                        let str = data.label.clone();
-                        let mut quant = bot_arc_clone.lock().await;
-                        if quant.is_update.contains_key(&data.label) && *quant.is_update.get(str.as_str()).unwrap(){
-                            max_buy = Decimal::ZERO;
-                            min_sell = Decimal::ZERO;
-                            quant.is_update.remove(str.as_str());
-                        }
-                        if trade.p > max_buy || max_buy == Decimal::ZERO{
-                            max_buy = trade.p
-                        }
-                        if trade.p < min_sell || min_sell == Decimal::ZERO{
-                            min_sell = trade.p
-                        }
-                        {
-                            quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
-                        }
-                    } else if data.channel == "bookTicker" {
-                        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
-                        if ticker.u > update_flag_u {
-                            {
-                                let mut quant = bot_arc_clone.lock().await;
-                                quant._update_ticker(SpecialTicker{
-                                    sell: ticker.a.clone(),
-                                    buy: ticker.b.clone(),
-                                    mid_price: Default::default(),
-                                }, data.label.clone());
-                                let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
-                                quant._update_depth(depth.clone(), data.label.clone());
-                                quant.local_depths.insert(data.label.clone(), depth);
-                            }
-                        } else {
-                            update_flag_u = ticker.u;
-                        }
-                    } else if data.channel == "depth" {
-                        // TODO: self._check_update_u(msg['u']):
-                        let depth = standard::binance_handle::handle_special_depth(data);
-                        {
-                            let mut quant = bot_arc_clone.lock().await;
-                            quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                            quant._update_depth(depth.depth.clone(), depth.name.clone());
-                            quant.local_depths.insert(depth.name, depth.depth);
-                        }
-                    }
-                },
-                None => {
-                    error!("参考交易所通道错误");
-                    break;
-                }
-            }
-        }
-    });
-}
 
 // 定期触发的系统逻辑
 pub fn on_timer(quant_arc: Arc<Mutex<Quant>>) -> JoinHandle<()> {