|
|
@@ -3,6 +3,7 @@ use std::str::FromStr;
|
|
|
use std::sync::Arc;
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
use std::time::Duration;
|
|
|
+use chrono::{Utc};
|
|
|
use rust_decimal::Decimal;
|
|
|
use serde_json::Value;
|
|
|
use tokio::spawn;
|
|
|
@@ -15,19 +16,20 @@ use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsT
|
|
|
use exchanges::gate_swap_rest::GateSwapRest;
|
|
|
use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
|
|
|
use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
|
|
|
+use global::delay_time::DelayTime;
|
|
|
use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, GateSwap, KucoinSwap};
|
|
|
use standard::SpecialTicker;
|
|
|
use crate::model::{OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa};
|
|
|
use crate::quant::Quant;
|
|
|
|
|
|
// 交易交易所启动
|
|
|
-pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>, time_record: Arc<Mutex<DelayTime>>){
|
|
|
match exchange_name.as_str() {
|
|
|
"gate_usdt_swap" => {
|
|
|
- gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ gate_swap_run(bool_v1, time_record, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"kucoin_usdt_swap" => {
|
|
|
- kucoin_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ kucoin_swap_run(bool_v1, time_record, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
_ => {
|
|
|
error!("交易交易所启动失败,参数错误!")
|
|
|
@@ -36,19 +38,19 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name:
|
|
|
}
|
|
|
|
|
|
// 参考交易所启动
|
|
|
-pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>, time_record: Arc<Mutex<DelayTime>>){
|
|
|
match exchange_name.as_str() {
|
|
|
"binance_usdt_swap" => {
|
|
|
- reference_binance_swap_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ reference_binance_swap_run(bool_v1,time_record, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"binance_usdt_spot" => {
|
|
|
- reference_binance_spot_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ reference_binance_spot_run(bool_v1,time_record, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"gate_usdt_swap" => {
|
|
|
- gate_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ gate_swap_run(bool_v1, time_record,0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"kucoin_usdt_swap" => {
|
|
|
- kucoin_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ kucoin_swap_run(bool_v1, time_record,0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
_ => {
|
|
|
error!("参考交易所启动失败,参数错误!")
|
|
|
@@ -57,7 +59,7 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
|
|
|
}
|
|
|
|
|
|
// 1交易、0参考 gate 合约 启动
|
|
|
-async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, time_record: Arc<Mutex<DelayTime>>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
|
|
|
// 获取user_id
|
|
|
@@ -116,11 +118,16 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mu
|
|
|
continue;
|
|
|
}
|
|
|
if data.channel == "futures.order_book" {
|
|
|
- let depth = standard::gate_handle::handle_special_depth(data);
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data);
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
} else if data.channel == "futures.balances" {
|
|
|
@@ -130,6 +137,9 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mu
|
|
|
quant.update_equity(account);
|
|
|
}
|
|
|
} else if data.channel == "futures.orders" {
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
|
|
|
let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
for order in orders.order {
|
|
|
@@ -149,9 +159,11 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mu
|
|
|
};
|
|
|
order_infos.push(order_info);
|
|
|
}
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- quant.update_order(order_infos);
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ quant.update_order(order_infos, &mut time_delay);
|
|
|
}
|
|
|
} else if data.channel == "futures.positions" {
|
|
|
let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
|
|
|
@@ -189,7 +201,7 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mu
|
|
|
}
|
|
|
|
|
|
// 1交易、0参考 kucoin 合约 启动
|
|
|
-async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<DelayTime>>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
let symbols_clone = symbols.clone();
|
|
|
let mut symbol_arr = Vec::new();
|
|
|
@@ -235,11 +247,16 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<
|
|
|
continue;
|
|
|
}
|
|
|
if data.channel == "level2" {
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
} else if data.channel == "tickerV2" {
|
|
|
@@ -256,6 +273,9 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<
|
|
|
quant.update_equity(account);
|
|
|
}
|
|
|
} else if data.channel == "symbolOrderChange" {
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
|
|
|
let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
for order in orders.order {
|
|
|
@@ -278,9 +298,11 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<
|
|
|
};
|
|
|
order_infos.push(order_info);
|
|
|
}
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- quant.update_order(order_infos);
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ quant.update_order(order_infos, &mut time_delay);
|
|
|
}
|
|
|
} else if data.channel == "position.change" {
|
|
|
let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
|
|
|
@@ -318,7 +340,7 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<
|
|
|
}
|
|
|
|
|
|
// 参考 币安 现货 启动
|
|
|
-async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<DelayTime>>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
spawn(async move {
|
|
|
let mut ba_exc = BinanceSpotWs::new_label(name, false, exchange_params, BinanceSpotWsType::PublicAndPrivate, tx);
|
|
|
@@ -362,17 +384,22 @@ async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mut
|
|
|
quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
|
|
|
}
|
|
|
} else if data.channel == "bookTicker" {
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
if ticker.u > update_flag_u {
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(SpecialTicker{
|
|
|
sell: ticker.a.clone(),
|
|
|
buy: ticker.b.clone(),
|
|
|
mid_price: Default::default(),
|
|
|
}, data.label.clone());
|
|
|
let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
|
|
|
- quant._update_depth(depth.clone(), data.label.clone());
|
|
|
+ quant._update_depth(depth.clone(), data.label.clone(), &mut time_delay);
|
|
|
quant.local_depths.insert(data.label.clone(), depth);
|
|
|
}
|
|
|
} else {
|
|
|
@@ -383,11 +410,16 @@ async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mut
|
|
|
let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
|
|
|
let u = v["lastUpdateId"].as_i64().unwrap();
|
|
|
if u > update_flag_u {
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data);
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
|
|
|
@@ -409,7 +441,7 @@ async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mut
|
|
|
|
|
|
|
|
|
// 参考 币安 合约 启动
|
|
|
-async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<DelayTime>>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
spawn( async move {
|
|
|
let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_params, BinanceWsType::PublicAndPrivate, tx);
|
|
|
@@ -452,17 +484,22 @@ async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mut
|
|
|
quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
|
|
|
}
|
|
|
} else if data.channel == "bookTicker" {
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
if ticker.u > update_flag_u {
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(SpecialTicker{
|
|
|
sell: ticker.a.clone(),
|
|
|
buy: ticker.b.clone(),
|
|
|
mid_price: Default::default(),
|
|
|
}, data.label.clone());
|
|
|
let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
|
|
|
- quant._update_depth(depth.clone(), data.label.clone());
|
|
|
+ quant._update_depth(depth.clone(), data.label.clone(), &mut time_delay);
|
|
|
quant.local_depths.insert(data.label.clone(), depth);
|
|
|
}
|
|
|
} else {
|
|
|
@@ -473,11 +510,16 @@ async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mut
|
|
|
let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
|
|
|
let u = v["u"].as_i64().unwrap();
|
|
|
if u > update_flag_u {
|
|
|
+ let mut time_delay = time_record.lock().await;
|
|
|
+ time_delay.network = data.time.clone();
|
|
|
+ time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, data);
|
|
|
+ time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
+ time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
} else {
|