| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791 |
- use std::collections::{BTreeMap};
- use std::io::{Error, ErrorKind};
- use std::str::FromStr;
- use tokio::sync::mpsc::Sender;
- use async_trait::async_trait;
- use futures::stream::FuturesUnordered;
- use futures::TryStreamExt;
- use rust_decimal::Decimal;
- use serde_json::{from_value, json, Value};
- use rust_decimal::prelude::FromPrimitive;
- use serde::{Deserialize, Serialize};
- use tokio::time::Instant;
- use tracing::{error, info, trace};
- use exchanges::bybit_swap_rest::BybitSwapRest;
- use crate::{Platform, ExchangeEnum, Account, Position, Ticker, Market, Order, OrderCommand, PositionModeEnum, Record};
- use global::trace_stack::TraceStack;
- #[derive(Debug, Clone, Deserialize, Serialize)]
- #[serde(rename_all = "camelCase")]
- struct SwapTicker {
- symbol: String,
- high_price24h: Decimal,
- low_price24h: Decimal,
- bid1_price: Decimal,
- ask1_price: Decimal,
- last_price: Decimal,
- volume24h: Decimal
- }
- #[allow(dead_code)]
- #[derive(Clone)]
- pub struct BybitSwap {
- exchange: ExchangeEnum,
- symbol: String,
- symbol_uppercase: String,
- is_colo: bool,
- params: BTreeMap<String, String>,
- request: BybitSwapRest,
- market: Market,
- order_sender: Sender<Order>,
- error_sender: Sender<Error>,
- }
- impl BybitSwap {
- pub async fn new(symbol: String, is_colo: bool, params: BTreeMap<String, String>, order_sender: Sender<Order>, error_sender: Sender<Error>) -> BybitSwap {
- let market = Market::new();
- let mut bybit_swap = BybitSwap {
- exchange: ExchangeEnum::BybitSwap,
- symbol: symbol.to_uppercase(),
- symbol_uppercase: symbol.replace("_", "").to_uppercase(),
- is_colo,
- params: params.clone(),
- request: BybitSwapRest::new(is_colo, params.clone()),
- market,
- order_sender,
- error_sender,
- };
- // 修改持仓模式
- let symbol_array: Vec<&str> = symbol.split("_").collect();
- let mode_result = bybit_swap.set_dual_mode(symbol_array[1], false).await;
- match mode_result {
- Ok(_) => {
- trace!("Bybit:设置持仓模式成功!")
- }
- Err(error) => {
- error!("Bybit:设置持仓模式失败!mode_result={}", error)
- }
- }
- // 设置持仓杠杆
- let lever_rate_result = bybit_swap.set_dual_leverage("1").await;
- match lever_rate_result {
- Ok(ok) => {
- info!("Bybit:设置持仓杠杆成功!{:?}", ok);
- }
- Err(error) => {
- error!("Bybit:设置持仓杠杆失败!{:?}", error)
- }
- }
- // 获取市场信息
- bybit_swap.market = BybitSwap::get_market(&mut bybit_swap).await.unwrap_or(bybit_swap.market);
- bybit_swap
- }
- }
- #[async_trait]
- impl Platform for BybitSwap {
- // 克隆方法
- fn clone_box(&self) -> Box<dyn Platform + Send + Sync> { Box::new(self.clone()) }
- // 获取交易所模式
- fn get_self_exchange(&self) -> ExchangeEnum {
- ExchangeEnum::BybitSwap
- }
- // 获取交易对
- fn get_self_symbol(&self) -> String { self.symbol.clone() }
- // 获取是否使用高速通道
- fn get_self_is_colo(&self) -> bool {
- self.is_colo
- }
- // 获取params信息
- fn get_self_params(&self) -> BTreeMap<String, String> {
- self.params.clone()
- }
- // 获取market信息
- fn get_self_market(&self) -> Market { self.market.clone() }
- // 获取请求时间
- fn get_request_delays(&self) -> Vec<i64> { self.request.get_delays() }
- // 获取请求平均时间
- fn get_request_avg_delay(&self) -> Decimal { self.request.get_avg_delay() }
- // 获取请求最大时间
- fn get_request_max_delay(&self) -> i64 { self.request.get_max_delay() }
- // 获取服务器时间
- async fn get_server_time(&mut self) -> Result<String, Error> {
- let res_data = self.request.get_server_time().await;
- if res_data.code == 200 {
- let result = res_data.data["server_time"].to_string();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 获取账号信息
- async fn get_account(&mut self) -> Result<Account, Error> {
- let symbol_array: Vec<&str> = self.symbol.split("_").collect();
- let res_data = self.request.get_account_balance(symbol_array[1].parse().unwrap()).await;
- if res_data.code == 200 {
- let arr_infos: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
- if arr_infos.len() < 1usize{
- return Err(Error::new(ErrorKind::NotFound, format!("{} 无账户信息", symbol_array[1])));
- }
- let coin_infos: Vec<Value> = from_value(arr_infos[0]["coin"].clone()).unwrap();
- if coin_infos.len() < 1usize{
- return Err(Error::new(ErrorKind::NotFound, format!("{} 无账户信息", symbol_array[1])));
- }
- let balance = Decimal::from_str(coin_infos[0]["equity"].as_str().unwrap()).unwrap();
- let available_balance = Decimal::from_str(coin_infos[0]["walletBalance"].as_str().unwrap()).unwrap();
- let frozen_balance = balance - available_balance;
- let result = Account {
- coin: symbol_array[1].to_string(),
- balance,
- available_balance,
- frozen_balance,
- stocks: Decimal::ZERO,
- available_stocks: Decimal::ZERO,
- frozen_stocks: Decimal::ZERO,
- };
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_spot_account(&mut self) -> Result<Vec<Account>, Error> {
- Err(Error::new(ErrorKind::NotFound, "bybit_swap:该交易所方法未实现".to_string()))
- }
- // 获取持仓信息
- async fn get_position(&mut self) -> Result<Vec<Position>, Error> {
- let symbol = self.symbol_uppercase.clone();
- let ct_val = self.market.multiplier;
- let res_data = self.request.get_positions(symbol, "".to_string()).await;
- if res_data.code == 200 {
- let result = res_data.data["list"].as_array().unwrap().iter().map(|item| { format_position_item(item, ct_val) }).collect();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 获取所有持仓
- async fn get_positions(&mut self) -> Result<Vec<Position>, Error> {
- let symbol_array: Vec<&str> = self.symbol.split("_").collect();
- let ct_val = self.market.multiplier;
- let res_data = self.request.get_positions("".to_string(), symbol_array[1].to_string().to_uppercase()).await;
- if res_data.code == 200 {
- info!("{}", res_data.data.to_string());
- let result = res_data.data["list"].as_array().unwrap().iter().map(|item| { format_position_item(item, ct_val) }).collect();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 获取市场行情
- async fn get_ticker(&mut self) -> Result<Ticker, Error> {
- let symbol = self.symbol_uppercase.clone();
- let res_data = self.request.get_tickers(symbol).await;
- if res_data.code == 200 {
- let list :Vec<SwapTicker> = from_value(res_data.data["list"].clone()).unwrap_or(Vec::new());
- if list.len() < 1usize {
- error!("bybit_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
- return Err(Error::new(ErrorKind::Other, res_data.to_string()));
- }
- let value = list[0].clone();
- Ok(Ticker{
- time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
- high: value.high_price24h,
- low: value.low_price24h,
- sell: value.ask1_price,
- buy: value.bid1_price,
- last: value.last_price,
- volume: value.volume24h,
- open_interest: Default::default(),
- })
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_record(&mut self, interval: String) -> Result<Vec<Record>, Error> {
- let symbol = self.symbol_uppercase.clone();
- let res_data = self.request.get_record(symbol, interval, "100".to_string()).await;
- if res_data.code == 200 {
- let mut records: Vec<Record> = vec![];
- for record_value in res_data.data["list"].as_array().unwrap() {
- let value_array = record_value.as_array().unwrap();
- records.push(Record {
- time: Decimal::from_str(value_array[0].as_str().unwrap()).unwrap(),
- open: Decimal::from_str(value_array[1].as_str().unwrap()).unwrap(),
- high: Decimal::from_str(value_array[2].as_str().unwrap()).unwrap(),
- low: Decimal::from_str(value_array[3].as_str().unwrap()).unwrap(),
- close: Decimal::from_str(value_array[4].as_str().unwrap()).unwrap(),
- volume: Decimal::from_str(value_array[5].as_str().unwrap()).unwrap(),
- symbol: "".to_string(),
- });
- }
- records.reverse();
- Ok(records)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_ticker_symbol(&mut self, symbol: String) -> Result<Ticker, Error> {
- let symbol_upper = symbol.replace("_", "").to_uppercase();
- let res_data = self.request.get_tickers(symbol_upper.clone()).await;
- if res_data.code == 200 {
- let list: Vec<SwapTicker> = from_value(res_data.data["list"].clone()).unwrap();
- let ticker_info = list.iter().find(|&item| item.symbol == symbol_upper);
- match ticker_info {
- None => {
- error!("bybit_swap:获取Ticker信息错误!\nget_ticker:res_data={:?}", res_data);
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- Some(value) => {
- let result = Ticker {
- time: Decimal::from_i64(chrono::Utc::now().timestamp_millis()).unwrap(),
- high: value.high_price24h,
- low: value.low_price24h,
- sell: value.ask1_price,
- buy: value.bid1_price,
- last: value.last_price,
- volume: value.volume24h,
- open_interest: Default::default(),
- };
- Ok(result)
- }
- }
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_market(&mut self) -> Result<Market, Error> {
- let symbol = self.symbol_uppercase.clone();
- let res_data = self.request.get_instruments_info(symbol.clone()).await;
- if res_data.code == 200 {
- let arr_data: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
- let market_info = arr_data.iter().find(|&item| item["symbol"].as_str().unwrap() == symbol);
- match market_info {
- None => {
- error!("bybit_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- Some(value) => {
- let base_coin = value["baseCoin"].as_str().unwrap();
- let quote_coin = value["quoteCoin"].as_str().unwrap();
- let name = format!("{}_{}",base_coin, quote_coin);
- let tick_size = Decimal::from_str(value["priceFilter"]["minPrice"].as_str().unwrap().trim()).unwrap();
- let min_qty = Decimal::from_str(value["lotSizeFilter"]["minOrderQty"].as_str().unwrap().trim()).unwrap();
- let max_qty = Decimal::from_str(value["lotSizeFilter"]["maxOrderQty"].as_str().unwrap().trim()).unwrap();
- let ct_val = Decimal::ONE;
- let amount_size = min_qty * ct_val;
- let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
- let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
- let min_notional = Decimal::from_str(value["lotSizeFilter"]["minNotionalValue"].as_str().unwrap().trim()).unwrap();
- let max_notional = max_qty * ct_val;
- let result = Market {
- symbol: name,
- base_asset: base_coin.to_string(),
- quote_asset: quote_coin.to_string(),
- tick_size,
- amount_size,
- price_precision,
- amount_precision,
- min_qty,
- max_qty,
- min_notional,
- max_notional,
- multiplier: ct_val,
- };
- Ok(result)
- }
- }
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn get_market_symbol(&mut self, symbol: String) -> Result<Market, Error> {
- let symbol = symbol.replace("_", "").to_uppercase();
- let res_data = self.request.get_instruments_info(symbol.clone()).await;
- if res_data.code == 200 {
- let arr_data: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
- let market_info = arr_data.iter().find(|item| item["symbol"].as_str().unwrap() == symbol);
- match market_info {
- None => {
- error!("bybit_swap:获取Market信息错误!\nget_market:res_data={:?}", res_data);
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- Some(value) => {
- let base_coin = value["baseCoin"].as_str().unwrap();
- let quote_coin = value["quoteCoin"].as_str().unwrap();
- let name = format!("{}_{}",base_coin, quote_coin);
- let tick_size = Decimal::from_str(value["priceFilter"]["minPrice"].as_str().unwrap().trim()).unwrap();
- let min_qty = Decimal::from_str(value["lotSizeFilter"]["minOrderQty"].as_str().unwrap().trim()).unwrap();
- let max_qty = Decimal::from_str(value["lotSizeFilter"]["maxOrderQty"].as_str().unwrap().trim()).unwrap();
- let ct_val = Decimal::ONE;
- let amount_size = min_qty * ct_val;
- let price_precision = Decimal::from_u32(tick_size.scale()).unwrap();
- let amount_precision = Decimal::from_u32(amount_size.scale()).unwrap();
- let min_notional = Decimal::from_str(value["lotSizeFilter"]["minNotionalValue"].as_str().unwrap().trim()).unwrap();
- let max_notional = max_qty * ct_val;
- let result = Market {
- symbol: name,
- base_asset: base_coin.to_string(),
- quote_asset: quote_coin.to_string(),
- tick_size,
- amount_size,
- price_precision,
- amount_precision,
- min_qty,
- max_qty,
- min_notional,
- max_notional,
- multiplier: ct_val,
- };
- Ok(result)
- }
- }
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 获取订单详情
- async fn get_order_detail(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
- let symbol = self.symbol_uppercase.clone();
- let ct_val = self.market.multiplier;
- let id = if !custom_id.trim().eq("") { format!("t-{}", custom_id) } else { String::new() };
- let res_data = self.request.get_order(symbol, order_id.parse().unwrap(), id).await;
- if res_data.code == 200 {
- let res_data_json: Value = res_data.data["list"].clone();
- if res_data_json.is_array() && res_data_json.as_array().unwrap().len() == 0 {
- return Err(Error::new(ErrorKind::Other, "没有该订单!"));
- }
- let result = format_order_item(res_data_json.as_array().unwrap()[0].clone(), ct_val);
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 获取订单列表
- async fn get_orders_list(&mut self, _status: &str) -> Result<Vec<Order>, Error> {
- Err(Error::new(ErrorKind::Other, "bybit获取订单列表暂未实现".to_string()))
- }
- // 下单接口
- async fn take_order(&mut self, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
- let symbol = self.symbol_uppercase.clone();
- let ct_val = self.market.multiplier;
- let size = amount / ct_val;
- let mut params = json!({
- "orderLinkId": format!("t-{}", custom_id),
- "symbol": symbol.to_string(),
- "price": price.to_string(),
- "category": "linear",
- "orderType":"Limit",
- "qty": json!(size),
- // 0.單向持倉 1.買側雙向持倉 2.賣側雙向持倉
- // "positionIdx": json!(1),
- "positionIdx": json!(0),
- "reduceOnly": json!(false)
- });
- if price.eq(&Decimal::ZERO) {
- params["timeInForce"] = json!("IOC".to_string());
- }
- match origin_side {
- "kd" => {
- params["side"] = json!("Buy");
- }
- "pd" => {
- params["side"] = json!("Sell");
- // 减仓
- params["reduceOnly"] = json!(true);
- }
- "kk" => {
- params["side"] = json!("Sell");
- // params["positionIdx"] = json!(2);
- }
- "pk" => {
- params["side"] = json!("Buy");
- // 减仓
- params["reduceOnly"] = json!(true);
- // params["positionIdx"] = json!(2);
- }
- _ => { error!("下单参数错误"); }
- };
- let res_data = self.request.swap_order(params).await;
- if res_data.code == 200 {
- let result = format_new_order_item(res_data.data, price, size);
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn take_order_symbol(&mut self, symbol: String, ct_val: Decimal, custom_id: &str, origin_side: &str, price: Decimal, amount: Decimal) -> Result<Order, Error> {
- let symbol_upper = symbol.replace("_", "").trim().to_uppercase();
- let size = (amount / ct_val).floor();
- let order_type = if price == Decimal::ZERO {
- "Market"
- } else {
- "Limit"
- };
- let mut params = json!({
- "orderLinkId": format!("t-{}", custom_id),
- "symbol": symbol_upper,
- "price": price.to_string(),
- "category": "linear",
- "orderType": order_type,
- "qty": json!(size),
- // 0.單向持倉 1.買側雙向持倉 2.賣側雙向持倉
- "positionIdx": json!(0),
- "reduceOnly": json!(false)
- });
- if price.eq(&Decimal::ZERO) {
- params["timeInForce"] = json!("IOC".to_string());
- }
- match origin_side {
- "kd" => {
- params["side"] = json!("Buy");
- }
- "pd" => {
- params["side"] = json!("Sell");
- // params["positionIdx"] = json!(1);
- // 减仓
- params["reduceOnly"] = json!(true);
- }
- "kk" => {
- params["side"] = json!("Sell");
- // params["positionIdx"] = json!(2);
- }
- "pk" => {
- params["side"] = json!("Buy");
- // params["positionIdx"] = json!(2);
- // 减仓
- params["reduceOnly"] = json!(true);
- }
- _ => { error!("下单参数错误"); }
- };
- let res_data = self.request.swap_order(params.clone()).await;
- if res_data.code == 200 {
- let result = format_new_order_item(res_data.data, price, size);
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 撤销订单
- async fn cancel_order(&mut self, order_id: &str, custom_id: &str) -> Result<Order, Error> {
- let symbol = self.symbol_uppercase.clone();
- let id = format!("t-{}", custom_id);
- let res_data = self.request.cancel_order(symbol, String::from(order_id), id.clone()).await;
- if res_data.code == 200 {
- let result = format_cancel_order_item(res_data.data);
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 批量撤销订单
- async fn cancel_orders(&mut self) -> Result<Vec<Order>, Error> {
- let symbol = self.symbol_uppercase.clone();
- let res_data = self.request.cancel_orders(symbol).await;
- if res_data.code == 200 {
- info!("{}", res_data.data.to_string());
- let res_arr: Vec<Value> = from_value(res_data.data).unwrap();
- let result = res_arr.iter().map(|item| format_cancel_order_item(item.clone())).collect();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn cancel_orders_all(&mut self) -> Result<Vec<Order>, Error> {
- let symbol = self.symbol_uppercase.clone();
- let res_data = self.request.cancel_orders(symbol).await;
- if res_data.code == 200 {
- info!("{}", res_data.data.to_string());
- let res_arr: Vec<Value> = from_value(res_data.data["list"].clone()).unwrap();
- let result = res_arr.iter().map(|item| format_cancel_order_item(item.clone())).collect();
- Ok(result)
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn take_stop_loss_order(&mut self, _stop_price: Decimal, _price: Decimal, _side: &str) -> Result<Value, Error> {
- Err(Error::new(ErrorKind::NotFound, "bybit_swap:该交易所方法未实现".to_string()))
- }
- async fn cancel_stop_loss_order(&mut self, _order_id: &str) -> Result<Value, Error> {
- Err(Error::new(ErrorKind::NotFound, "bybit_swap:该交易所方法未实现".to_string()))
- }
- // 设置持仓模式
- async fn set_dual_mode(&mut self, _coin: &str, is_dual_mode: bool) -> Result<String, Error> {
- let coin_format = self.symbol_uppercase.clone();
- let mut mod_num = 0;
- if is_dual_mode {
- mod_num = 3;
- }
- let res_data = self.request.set_position_mode(coin_format, mod_num).await;
- if res_data.code == 200 {
- Ok(res_data.data.to_string())
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- // 更新双持仓模式下杠杆
- async fn set_dual_leverage(&mut self, leverage: &str) -> Result<String, Error> {
- let symbol = self.symbol_uppercase.clone();
- let res_data = self.request.set_leverage(symbol, leverage.to_string()).await;
- if res_data.code == 200 {
- Ok(res_data.data.to_string())
- } else {
- Err(Error::new(ErrorKind::Other, res_data.to_string()))
- }
- }
- async fn set_auto_deposit_status(&mut self, _status: bool) -> Result<String, Error> { Err(Error::new(ErrorKind::NotFound, "gate:该交易所方法未实现".to_string())) }
- // 交易账户互转
- async fn wallet_transfers(&mut self, _coin: &str, _from: &str, _to: &str, _amount: Decimal) -> Result<String, Error> {
- // let coin_format = coin.to_string().to_lowercase();
- // let res_data = self.request.wallet_transfers(coin_format.clone(), from.to_string(), to.to_string(), amount.to_string(), coin_format.clone()).await;
- // if res_data.code == 200 {
- // let res_data_str = &res_data.data;
- // let result = res_data_str.clone();
- // Ok(result)
- // } else {
- // Err(Error::new(ErrorKind::Other, res_data.to_string()))
- // }
- Err(Error::new(ErrorKind::Other, "暂未实现!"))
- }
- // 指令下单
- async fn command_order(&mut self, order_command: &mut OrderCommand, trace_stack: &TraceStack) {
- // 下单指令
- let mut handles = vec![];
- for item in order_command.limits_open.keys() {
- let mut self_clone = self.clone();
- let amount = Decimal::from_str(order_command.limits_open[item].get(0).unwrap_or(&"0".to_string())).unwrap();
- let side = order_command.limits_open[item].get(1).unwrap().clone();
- let price = Decimal::from_str(order_command.limits_open[item].get(2).unwrap_or(&"0".to_string())).unwrap();
- let cid = order_command.limits_open[item].get(3).unwrap().clone();
- let mut ts = trace_stack.clone();
- let handle = tokio::spawn(async move {
- ts.on_before_send();
- // TraceStack::show_delay(&ts.ins);
- let result = self_clone.take_order(cid.as_str(), side.as_str(), price, amount).await;
- ts.on_after_send();
- match result {
- Ok(mut result) => {
- // 记录此订单完成时间
- result.trace_stack = ts;
- self_clone.order_sender.send(result).await.unwrap();
- }
- Err(error) => {
- error!("bybit:下单失败:{:?}", error);
- let mut err_order = Order::new();
- err_order.custom_id = cid.clone();
- err_order.status = "REMOVE".to_string();
- self_clone.order_sender.send(err_order).await.unwrap();
- self_clone.error_sender.send(error).await.unwrap();
- }
- }
- });
- handles.push(handle)
- }
- let futures = FuturesUnordered::from_iter(handles);
- let _: Result<Vec<_>, _> = futures.try_collect().await;
- // 撤销订单
- let mut cancel_handles = vec![];
- for item in order_command.cancel.keys() {
- let mut self_clone = self.clone();
- let order_id = order_command.cancel[item].get(1).unwrap().clone();
- let custom_id = order_command.cancel[item].get(0).unwrap().clone();
- let handle = tokio::spawn(async move {
- let result = self_clone.cancel_order(&order_id, &custom_id).await;
- match result {
- Ok(_) => {
- // result_sd.send(result).await.unwrap();
- }
- Err(error) => {
- // 取消失败去查订单。
- let query_rst = self_clone.get_order_detail(&order_id, &custom_id).await;
- match query_rst {
- Ok(order) => {
- self_clone.order_sender.send(order).await.unwrap();
- }
- Err(_err) => {
- error!("bybit:撤单失败,而且查单也失败了,oid={}, cid={}。", order_id.clone(), custom_id.clone());
- }
- }
- self_clone.error_sender.send(error).await.unwrap();
- }
- }
- });
- cancel_handles.push(handle)
- }
- let futures = FuturesUnordered::from_iter(cancel_handles);
- let _: Result<Vec<_>, _> = futures.try_collect().await;
- // 检查订单指令
- let mut check_handles = vec![];
- for item in order_command.check.keys() {
- let mut self_clone = self.clone();
- let order_id = order_command.check[item].get(1).unwrap().clone();
- let custom_id = order_command.check[item].get(0).unwrap().clone();
- let handle = tokio::spawn(async move {
- let result = self_clone.get_order_detail(&order_id, &custom_id).await;
- match result {
- Ok(result) => {
- self_clone.order_sender.send(result).await.unwrap();
- }
- Err(error) => {
- self_clone.error_sender.send(error).await.unwrap();
- }
- }
- });
- check_handles.push(handle)
- }
- let futures = FuturesUnordered::from_iter(check_handles);
- let _: Result<Vec<_>, _> = futures.try_collect().await;
- }
- }
- pub fn format_position_item(position: &Value, ct_val: Decimal) -> Position {
- let position_idx = position["positionIdx"].to_string();
- let mut position_mode = match position_idx.as_str() {
- "0" => PositionModeEnum::Both,
- "1" => PositionModeEnum::Long,
- "2" => PositionModeEnum::Short,
- _ => {
- error!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position);
- panic!("bybit_swap:格式化持仓模式错误!\nformat_position_item:position={:?}", position)
- }
- };
- let size_str: String = from_value(position["size"].clone()).unwrap();
- let size = Decimal::from_str(size_str.as_str()).unwrap();
- let side = position["side"].as_str().unwrap().to_string();
- let amount = size * ct_val;
- let mut profit = Decimal::ZERO;
- let profit_str = position["unrealisedPnl"].as_str().unwrap_or("0");
- if profit_str != "" {
- profit = Decimal::from_str(profit_str).unwrap();
- }
- match position_mode {
- PositionModeEnum::Both => {
- position_mode = match side.as_str() {
- "Buy" => PositionModeEnum::Long,
- "Sell" => PositionModeEnum::Short,
- _ => { PositionModeEnum::Both }
- }
- }
- _ => {}
- }
- Position {
- symbol: position["symbol"].as_str().unwrap_or("").parse().unwrap(),
- margin_level: Decimal::from_str(position["leverage"].as_str().unwrap()).unwrap(),
- amount,
- frozen_amount: Decimal::ZERO,
- price: Decimal::from_str(position["avgPrice"].as_str().unwrap()).unwrap(),
- profit,
- position_mode,
- margin: Decimal::from_str(position["positionBalance"].as_str().unwrap()).unwrap(),
- }
- }
- fn format_cancel_order_item(order: Value) -> Order {
- Order {
- id: format!("{}", order["orderId"].as_str().unwrap()),
- custom_id: order["orderLinkId"].as_str().unwrap().replace("t-my-custom-id_", "").replace("t-", ""),
- price: Decimal::ZERO,
- amount: Decimal::ZERO,
- deal_amount: Decimal::ZERO,
- avg_price: Decimal::ZERO,
- status: "REMOVE".to_string(),
- order_type: "limit".to_string(),
- trace_stack: TraceStack::new(0, Instant::now()).on_special("688 trace_stack".to_string())
- }
- }
- fn format_new_order_item(order: Value, price: Decimal, amount: Decimal) -> Order {
- Order {
- id: format!("{}", order["orderId"].as_str().unwrap()),
- custom_id: order["orderLinkId"].as_str().unwrap().replace("t-my-custom-id_", "").replace("t-", ""),
- price,
- amount,
- deal_amount: Decimal::ZERO,
- avg_price: price,
- status: "NEW".to_string(),
- order_type: "limit".to_string(),
- trace_stack: TraceStack::new(0, Instant::now()).on_special("688 trace_stack".to_string())
- }
- }
- pub fn format_order_item(order: Value, ct_val: Decimal) -> Order {
- let status = order["orderStatus"].as_str().unwrap_or("");
- let text = order["orderLinkId"].as_str().unwrap_or("");
- let mut size = Decimal::ZERO;
- let mut deal_amount = Decimal::ZERO;
- let mut avg_price = Decimal::ZERO;
- let right_str = order["cumExecQty"].to_string();
- let size_str = order["qty"].to_string();
- if !order.get("qty").is_some() {
- size = Decimal::from_str(size_str.as_str()).unwrap();
- let right_val = Decimal::from_str(order["cumExecValue"].as_str().unwrap()).unwrap();
- let right = Decimal::from_str(right_str.as_str()).unwrap();
- if right != Decimal::ZERO {
- avg_price = right_val / right;
- }
- deal_amount = right * ct_val;
- }
- let amount = size * ct_val;
- let custom_status = if status == "Filled" || status == "Cancelled" { "REMOVE".to_string() } else if status == "New" { "NEW".to_string() } else {
- "NULL".to_string()
- };
- let rst_order = Order {
- id: format!("{}", order["orderId"].as_str().unwrap()),
- custom_id: text.replace("t-my-custom-id_", "").replace("t-", ""),
- price: Decimal::from_str(order["price"].as_str().unwrap()).unwrap(),
- amount,
- deal_amount,
- avg_price,
- status: custom_status,
- order_type: "limit".to_string(),
- trace_stack: TraceStack::new(0, Instant::now()).on_special("688 trace_stack".to_string()),
- };
- return rst_order;
- }
|