| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672 |
- use std::collections::{BTreeMap};
- use exchanges::bitget_swap_rest::BitgetSwapRest;
- use std::io::{Error, ErrorKind};
- use tokio::sync::mpsc::Sender;
- use std::str::FromStr;
- use async_trait::async_trait;
- use futures::stream::FuturesUnordered;
- use futures::TryStreamExt;
- use rust_decimal::{Decimal, MathematicalOps};
- use rust_decimal::prelude::ToPrimitive;
- use rust_decimal_macros::dec;
- use serde_json::{json, Value};
- use tokio::spawn;
- use tokio::time::Instant;
- use tracing::{error, info};
- use global::trace_stack::TraceStack;
- use crate::exchange::ExchangeEnum;
- use crate::{Account, Market, Order, OrderCommand, Platform, Position, PositionModeEnum, Ticker, utils};
- #[allow(dead_code)]
- #[derive(Clone)]
- pub struct BitgetSwap {
- exchange: ExchangeEnum,
- symbol: String,
- is_colo: bool,
- params: BTreeMap<String, String>,
- request: BitgetSwapRest,
- market: Market,
- order_sender: Sender<Order>,
- error_sender: Sender<Error>,
- }
- impl BitgetSwap {
- pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> BitgetSwap {
- let market = Market::new();
- let mut bitget_swap = BitgetSwap {
- exchange: ExchangeEnum::BitgetSwap,
- symbol: symbol.to_uppercase(),
- is_colo,
- params: params.clone(),
- request: BitgetSwapRest::new(is_colo, params.clone()),
- market,
- order_sender,
- error_sender,
- };
- bitget_swap.market = BitgetSwap::get_market(&mut bitget_swap).await.unwrap();
- // 修改持仓模式
- let mode_result = bitget_swap.set_dual_mode("", true).await;
- match mode_result {
- Ok(ok) => {
- info!("BitgetSwap:设置持仓模式成功!{:?}", ok);
- }
- Err(error) => {
- error!("BitgetSwap:设置持仓模式失败!{:?}", error)
- }
- }
- // 设置持仓杠杆
- // let lever_rate_result = bitget_swap.set_dual_leverage("10").await;
- // match lever_rate_result {
- // Ok(ok) => {
- // info!("BitgetSwap:设置持仓杠杆成功!{:?}", ok);
- // }
- // Err(error) => {
- // error!("BitgetSwap:设置持仓杠杆失败!{:?}", error)
- // }
- // }
- return bitget_swap;
- }
- }
- #[async_trait]
- impl Platform for BitgetSwap {
- fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
- fn get_self_exchange(&self) -> ExchangeEnum { ExchangeEnum::BitgetSwap }
- fn get_self_symbol(&self) -> String { self.symbol.clone() }
- fn get_self_is_colo(&self) -> bool { self.is_colo }
- fn get_self_params(&self) -> BTreeMap<String, String> { self.params.clone() }
- fn get_self_market(&self) -> Market { self.market.clone() }
- fn get_request_delays(&self) -> Vec<i64> {
- // self.request.get_delays()
- vec![]
- }
- fn get_request_avg_delay(&self) -> Decimal {
- // self.request.get_avg_delay()
- Decimal::ZERO
- }
- fn get_request_max_delay(&self) -> i64 { 0 }
- async fn get_server_time(&mut self) -> Result<String, Error> {
- let res_data = self.request.get_server_time().await;
- if res_data.code == 200 {
- let res_data_json = res_data.data;
- let result = res_data_json["serverTime"].as_str().unwrap().to_string();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_account(&mut self) -> Result<Account, Error> {
- let response = self.request.get_account_info().await;
- if response.code == 200 {
- for data in response.data.as_array().unwrap() {
- if data["marginCoin"].as_str().unwrap() != "USDT" {
- continue
- }
- // 格式化account信息
- let mut account = Account {
- coin: data["marginCoin"].to_string(),
- balance: Decimal::from_str(data["accountEquity"].as_str().unwrap()).unwrap(),
- available_balance: Decimal::from_str(data["available"].as_str().unwrap()).unwrap(),
- frozen_balance: Default::default(),
- stocks: Default::default(),
- available_stocks: Default::default(),
- frozen_stocks: Default::default(),
- };
- account.frozen_balance = account.balance - account.available_balance;
- return Ok(account)
- }
- Err(Error::new(ErrorKind::NotFound, format!("bitget_usdt_swap 未能找到USDT账户:{}。", response.to_string())))
- } else {
- Err(Error::new(ErrorKind::Other, response.to_string()))
- }
- }
- async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
- Err(Error::new(ErrorKind::NotFound, "bitget_swap get_spot_account:该交易所方法未实现".to_string()))
- }
- async fn get_position(&mut self) -> Result<Vec<Position>, Error> { Err(Error::new(ErrorKind::NotFound, "bitget_swap get_position:该交易所方法未实现".to_string())) }
- async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
- let params = json!({
- "productType": "USDT-FUTURES",
- "marginCoin": "USDT"
- });
- let response = self.request.get_all_position(params).await;
- info!(?response);
- if response.code != 200 {
- return Err(Error::new(ErrorKind::NotFound, format!("bitget_swap 获取仓位异常{:?}", response).to_string()))
- }
- // 正常处理持仓信息
- let mut positions: Vec<Position> = vec![];
- if response.data.is_null() {
- return Ok(positions)
- }
- let positions_json = response.data.as_array().unwrap();
- for position_json in positions_json {
- let symbol = position_json["symbol"].as_str().unwrap().to_string();
- let margin_level = Decimal::from_str(position_json["leverage"].as_str().unwrap()).unwrap();
- let amount = Decimal::from_str(position_json["total"].as_str().unwrap()).unwrap();
- let frozen_amount = Decimal::from_str(position_json["locked"].as_str().unwrap()).unwrap();
- let price = Decimal::from_str(position_json["openPriceAvg"].as_str().unwrap()).unwrap();
- let profit = Decimal::from_str(position_json["unrealizedPL"].as_str().unwrap()).unwrap();
- let position_mode = match position_json["posMode"].as_str().unwrap() {
- "hedge_mode" => {
- match position_json["holdSide"].as_str().unwrap() {
- "short" => {
- PositionModeEnum::Short
- }
- "long" => {
- PositionModeEnum::Long
- },
- _ => {
- panic!("bitget_usdt_swap: 未知的持仓模式与持仓方向: {}, {}",
- position_json["posMode"].as_str().unwrap(), position_json["holdSide"].as_str().unwrap())
- }
- }
- },
- "one_way_mode" => {
- PositionModeEnum::Both
- },
- _ => {
- panic!("bitget_usdt_swap: 未知的持仓模式: {}", position_json["posMode"].as_str().unwrap())
- }
- };
- let margin = Decimal::from_str(position_json["marginSize"].as_str().unwrap()).unwrap();
- positions.push(Position {
- symbol,
- margin_level,
- amount,
- frozen_amount,
- price,
- profit,
- position_mode,
- margin,
- });
- }
- Ok(positions)
- }
- async fn get_ticker(&mut self) -> Result<Ticker, Error> {
- return self.get_ticker_symbol(self.symbol.clone()).await
- }
- async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
- let symbol_format = utils::format_symbol(symbol.clone(), "");
- let res_data = self.request.get_tickers(symbol_format).await;
- if res_data.code == 200 {
- let res_data_json = res_data.data;
- let ticker_info = res_data_json[0].clone();
- let time = (Decimal::from_str(&*ticker_info["ts"].as_str().unwrap()).unwrap() / dec!(1000)).floor().to_i64().unwrap();
- let result = Ticker {
- time,
- high: Decimal::from_str(ticker_info["high24h"].as_str().unwrap()).unwrap(),
- low: Decimal::from_str(ticker_info["low24h"].as_str().unwrap()).unwrap(),
- sell: Decimal::from_str(ticker_info["askPr"].as_str().unwrap()).unwrap(),
- sell_volume: Default::default(),
- buy: Decimal::from_str(ticker_info["bidPr"].as_str().unwrap()).unwrap(),
- buy_volume: Default::default(),
- last: Decimal::from_str(ticker_info["lastPr"].as_str().unwrap()).unwrap(),
- volume: Decimal::from_str(ticker_info["quoteVolume"].as_str().unwrap()).unwrap(),
- };
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_market(&mut self) -> Result<Market, Error> {
- self.get_market_symbol(self.symbol.clone()).await
- }
- async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
- let symbol_format = utils::format_symbol(symbol.clone(), "");
- let response = self.request.get_contracts(symbol_format.clone()).await;
- if response.code == 200 {
- let res_data_json = response.data.as_array().unwrap();
- let market_info = res_data_json[0].clone();
- info!(?market_info);
- if !market_info["symbol"].as_str().unwrap().to_string().eq(&symbol_format) {
- return Err(Error::new(ErrorKind::NotFound, format!("符号未找到:symbol={}, response={:?}", symbol_format, response))).unwrap();
- }
- let base_asset = market_info["baseCoin"].as_str().unwrap().to_string();
- let quote_asset = market_info["quoteCoin"].as_str().unwrap().to_string();
- let price_precision = Decimal::from_str(market_info["pricePlace"].as_str().unwrap()).unwrap();
- let tick_size = Decimal::TEN.powd(Decimal::NEGATIVE_ONE * price_precision);
- let amount_precision = Decimal::from_str(market_info["volumePlace"].as_str().unwrap()).unwrap();
- let amount_size = Decimal::TEN.powd(Decimal::NEGATIVE_ONE * amount_precision);
- let min_qty = Decimal::NEGATIVE_ONE;
- let max_qty = Decimal::NEGATIVE_ONE;
- // let ct_val = Decimal::from_str(&market_info["sizeMultiplier"].as_str().unwrap()).unwrap();
- let ct_val = Decimal::ONE;
- let result = Market {
- symbol: format!("{}_{}", base_asset, quote_asset),
- base_asset,
- quote_asset,
- tick_size,
- amount_size,
- price_precision,
- amount_precision,
- min_qty,
- max_qty,
- min_notional: min_qty,
- max_notional: max_qty,
- ct_val,
- };
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, response.to_string()))
- }
- }
- async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
- let symbol_format = utils::format_symbol(self.symbol.clone(), "");
- let params = json!({
- "symbol": symbol_format,
- "productType": "USDT-FUTURES",
- "clientOid": custom_id,
- "orderId": order_id
- });
- let ct_val = self.market.ct_val;
- let response = self.request.get_order(params).await;
- if response.code == 200 {
- let res_data_json = response.data;
- let result = format_order_item(res_data_json, ct_val);
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, response.to_string()))
- }
- }
- async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> {
- Err(Error::new(ErrorKind::NotFound, "bitget_swap get_orders_list:该交易所方法未实现".to_string()))
- // let symbol_format = utils::format_symbol(self.symbol.clone(), "");
- // let ct_val = self.market.ct_val;
- // let res_data = self.request.get_unfilled_orders(symbol_format.to_string(), "".to_string(), "".to_string(), "".to_string(), "100".to_string(), "".to_string()).await;
- // if res_data.code == 200 {
- // let res_data_str = &res_data.data;
- // let res_data_json: Vec<serde_json::Value> = serde_json::from_str(res_data_str).unwrap();
- // let result = res_data_json.iter().map(|item| format_order_item(item.clone(), ct_val)).collect();
- // Ok(result)
- // } else {
- // Err(Error::new(ErrorKind::Other, res_data.to_string()))
- // }
- }
- async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
- let ct_val = self.market.ct_val;
- return self.take_order_symbol(self.symbol.clone(), ct_val, custom_id, origin_side, price, amount).await;
- }
- async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
- let symbol_format = utils::format_symbol(symbol, "");
- let final_size = amount / ct_val;
- let mut params = json!({
- "symbol": symbol_format,
- "clientOid": custom_id,
- "productType": "USDT-FUTURES",
- "marginMode": "crossed",
- "marginCoin": "USDT",
- "size": final_size.to_string()
- });
- if price.eq(&Decimal::ZERO) {
- params["orderType"] = json!("market");
- params["force"] = json!("gtc");
- } else {
- params["price"] = json!(price.to_string());
- params["orderType"] = json!("limit");
- params["force"] = json!("gtc");
- };
- match origin_side {
- "kd" => {
- params["side"] = json!("buy");
- params["tradeSide"] = json!("open");
- }
- "pd" => {
- params["side"] = json!("buy");
- params["tradeSide"] = json!("close");
- }
- "kk" => {
- params["side"] = json!("sell");
- params["tradeSide"] = json!("open");
- }
- "pk" => {
- params["side"] = json!("sell");
- params["tradeSide"] = json!("close");
- }
- _ => { panic!("bitget_usdt_swap 下单参数错误"); }
- };
- let res_data = self.request.swap_order(params).await;
- if res_data.code != 200 {
- return Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- let res_data_json = res_data.data;
- let result = Order {
- id: res_data_json["orderId"].as_str().unwrap().to_string(),
- custom_id: res_data_json["clientOid"].as_str().unwrap().to_string(),
- price: Decimal::ZERO,
- amount: Decimal::ZERO,
- deal_amount: Decimal::ZERO,
- avg_price: Decimal::ZERO,
- status: "NEW".to_string(),
- order_type: "".to_string(),
- trace_stack: TraceStack::new(0, Instant::now()).on_special("328 bitget_swap".to_string()),
- };
- return Ok(result)
- }
- async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
- let symbol_format = utils::format_symbol(self.symbol.clone(), "");
- let params = json!({
- "symbol": symbol_format,
- "productType": "USDT-FUTURES",
- "clientOid": custom_id,
- "orderId": order_id
- });
- let response = self.request.cancel_order(params).await;
- // 取消失败,进行报错
- if response.code != 200 {
- return Err(Error::new(ErrorKind::NotFound, response.to_string()));
- }
- let res_data_json = response.data;
- let result = Order {
- id: res_data_json["orderId"].as_str().unwrap().to_string(),
- custom_id: res_data_json["clientOid"].as_str().unwrap().to_string(),
- price: Decimal::ZERO,
- amount: Decimal::ZERO,
- deal_amount: Decimal::ZERO,
- avg_price: Decimal::ZERO,
- status: "REMOVE".to_string(),
- order_type: "".to_string(),
- trace_stack: TraceStack::new(0, Instant::now()).on_special("443 bitget_swap".to_string()),
- };
- Ok(result)
- }
- async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
- Err(Error::new(ErrorKind::NotFound, "bitget_swap cancel_orders:该交易所方法未实现".to_string()))
- }
- async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
- let response = self.request.get_pending_orders().await;
- if response.code == 200 {
- info!("{}", response.data.to_string());
- let mut result = vec![];
- if !response.data["entrustedList"].is_null() {
- let orders_res_data_json = response.data["entrustedList"].as_array().unwrap();
- for order in orders_res_data_json {
- let symbol = order["symbol"].as_str().unwrap().to_string();
- let order_id = order["orderId"].as_str().unwrap().to_string();
- let params = json!({
- "symbol": symbol,
- "productType": "USDT-FUTURES",
- "orderId": order_id,
- });
- let cancel_res_data = self.request.cancel_order(params).await;
- if cancel_res_data.code == 200 {
- let cancel_res_data_json = cancel_res_data.data;
- result.push(Order {
- id: cancel_res_data_json["orderId"].as_str().unwrap().to_string(),
- custom_id: cancel_res_data_json["clientOid"].as_str().unwrap().to_string(),
- price: Decimal::ZERO,
- amount: Decimal::ZERO,
- deal_amount: Decimal::ZERO,
- avg_price: Decimal::ZERO,
- status: "REMOVE".to_string(),
- order_type: "".to_string(),
- trace_stack: TraceStack::new(0, Instant::now()).on_special("457 bitget_swap".to_string()),
- });
- } else {
- return Err(Error::new(ErrorKind::Other, cancel_res_data.to_string()));
- }
- }
- }
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, response.to_string()))
- }
- }
- async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<Value, Error> {
- Err(Error::new(ErrorKind::NotFound, "bitget_swap take_stop_loss_order:该交易所方法未实现".to_string()))
- }
- async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<Value, Error> {
- Err(Error::new(ErrorKind::NotFound, "bitget_swap cancel_stop_loss_order:该交易所方法未实现".to_string()))
- }
- async fn set_dual_mode(&mut self, _coin: &str, is_dual_mode: bool) -> Result<String, Error> {
- let pos_mode = if is_dual_mode {
- "hedge_mode"
- } else {
- "one_way_mode"
- };
- let params = json!({
- "productType": "USDT-FUTURES",
- "posMode": pos_mode,
- });
- let response = self.request.set_position_mode(params).await;
- if response.code != 200 {
- return Err(Error::new(ErrorKind::Other, format!("设置持仓模式失败:{:?}", response).to_string()))
- }
- return Ok(response.data.to_string());
- }
- async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
- let params = json!({
- "symbol": "ETHUSDT",
- "productType": "USDT-FUTURES",
- "marginCoin": "USDT",
- "leverage": leverage
- });
- let response = self.request.set_leverage(params).await;
- if response.code != 200 {
- return Err(Error::new(ErrorKind::Other, format!("设置杠杆失败:{:?}", response).to_string()))
- }
- return Ok(response.data.to_string());
- }
- async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> {
- Err(Error::new(ErrorKind::NotFound, "bitget_swap set_auto_deposit_status:该交易所方法未实现".to_string()))
- }
- async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
- Err(Error::new(ErrorKind::NotFound, "bitget_swap wallet_transfers:该交易所方法未实现".to_string()))
- // let coin_format = coin.to_string().to_uppercase();
- // let res_data = self.request.wallet_transfer(from.to_string(), to.to_string(), amount.to_string(), coin_format.clone(), "".to_string(), "".to_string()).await;
- // if res_data.code == 200 {
- // let res_data_str = &res_data.data;
- // let result = res_data_str.clone();
- // Ok(result)
- // } else {
- // Err(Error::new(ErrorKind::Other, res_data.to_string()))
- // }
- }
- async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
- let mut handles = vec![];
- // 下单指令
- for item in order_command.limits_open.keys() {
- let mut ts = trace_stack.clone();
- let amount = Decimal::from_str(&*order_command.limits_open[item].get(0).unwrap().clone()).unwrap();
- let side = order_command.limits_open[item].get(1).unwrap().clone();
- let price = Decimal::from_str(&*order_command.limits_open[item].get(2).unwrap().clone()).unwrap();
- let cid = order_command.limits_open[item].get(3).unwrap().clone();
- // order_name: [数量,方向,价格,c_id]
- let mut self_clone = self.clone();
- let handle = spawn(async move {
- // TraceStack::show_delay(&ts.ins);
- ts.on_before_send();
- let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
- ts.on_after_send();
- match result {
- Ok(mut result) => {
- result.trace_stack = ts;
- self_clone.order_sender.send(result).await.unwrap();
- }
- Err(error) => {
- info!(?error);
- let mut err_order = Order::new();
- err_order.custom_id = cid.clone();
- err_order.status = "REMOVE".to_string();
- self_clone.order_sender.send(err_order).await.unwrap();
- self_clone.error_sender.send(error).await.unwrap();
- }
- }
- });
- handles.push(handle)
- }
- let futures = FuturesUnordered::from_iter(handles);
- // 等待所有任务完成
- let _: Result<Vec<_>, _> = futures.try_collect().await;
- // 撤销订单
- let mut cancel_handlers = vec![];
- for item in order_command.cancel.keys() {
- let order_id = order_command.cancel[item].get(1).unwrap().clone();
- let custom_id = order_command.cancel[item].get(0).unwrap().clone();
- let mut self_clone = self.clone();
- let handle = spawn(async move {
- let result = self_clone.cancel_order(&order_id, &custom_id).await;
- match result {
- Ok(_) => {
- // result_sd.send(result).await.unwrap();
- }
- Err(error) => {
- // 取消失败去查订单。
- let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
- match query_rst {
- Ok(order) => {
- self_clone.order_sender.send(order).await.unwrap();
- }
- Err(err) => {
- error!("撤单失败,而且查单也失败了,bitget_swap,oid={}, cid={}, err={:?}。", order_id.clone(), custom_id.clone(), err);
- }
- }
- self_clone.error_sender.send(error).await.unwrap();
- }
- }
- });
- cancel_handlers.push(handle)
- }
- let futures = FuturesUnordered::from_iter(cancel_handlers);
- // 等待所有任务完成
- let _: Result<Vec<_>, _> = futures.try_collect().await;
- // 检查订单指令
- let mut check_handlers = vec![];
- for item in order_command.check.keys() {
- let order_id = order_command.check[item].get(1).unwrap().clone();
- let custom_id = order_command.check[item].get(0).unwrap().clone();
- let mut self_clone = self.clone();
- let handle = spawn(async move {
- let result = self_clone.get_order_detail(order_id.as_str(), custom_id.as_str()).await;
- match result {
- Ok(result) => {
- self_clone.order_sender.send(result).await.unwrap();
- }
- Err(error) => {
- self_clone.error_sender.send(error).await.unwrap();
- }
- }
- });
- check_handlers.push(handle)
- }
- let futures = FuturesUnordered::from_iter(check_handlers);
- // 等待所有任务完成
- let _: Result<Vec<_>, _> = futures.try_collect().await;
- }
- }
- // pub fn format_account_info(balance_data: Value) -> Account {
- // let balance_coin = balance_data["coin"].as_str().unwrap().to_string().to_uppercase();
- // let available_balance = Decimal::from_str(balance_data["available"].as_str().unwrap()).unwrap();
- // let frozen_balance = Decimal::from_str(balance_data["frozen"].as_str().unwrap()).unwrap();
- // let balance = available_balance + frozen_balance;
- //
- // Account {
- // coin: balance_coin,
- // balance,
- // available_balance,
- // frozen_balance,
- // stocks: Decimal::ZERO,
- // available_stocks: Decimal::ZERO,
- // frozen_stocks: Decimal::ZERO,
- // }
- // }
- pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
- let price = Decimal::from_str(order["price"].as_str().unwrap_or(order["priceAvg"].as_str().unwrap())).unwrap();
- let size = Decimal::from_str(order["size"].as_str().unwrap()).unwrap();
- let status = order["state"].as_str().unwrap();
- let base_volume = Decimal::from_str(order["quoteVolume"].as_str().unwrap()).unwrap();
- let avg_price = if order["priceAvg"].is_null() || order["priceAvg"].as_str().unwrap().is_empty() {
- Decimal::ZERO
- } else {
- Decimal::from_str(order["priceAvg"].as_str().unwrap().to_string().as_str()).unwrap()
- };
- let amount = size * ct_val;
- let deal_amount = base_volume * ct_val;
- let custom_status = if ["filled", "cancelled"].contains(&status) {
- "REMOVE".to_string()
- } else if ["init", "live", "new", "partially_filled"].contains(&status) {
- "NEW".to_string()
- } else {
- "NULL".to_string()
- };
- Order {
- id: order["orderId"].as_str().unwrap().to_string(),
- custom_id: order["clientOid"].as_str().unwrap().to_string(),
- price,
- amount,
- deal_amount,
- avg_price,
- status: custom_status,
- order_type: order["orderType"].as_str().unwrap().to_string(),
- trace_stack: TraceStack::new(0, Instant::now()).on_special("700 bitget_swap".to_string()),
- }
- }
|