|
|
@@ -3,7 +3,6 @@ use std::str::FromStr;
|
|
|
use std::sync::Arc;
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
use std::time::Duration;
|
|
|
-use chrono::{Utc};
|
|
|
use rust_decimal::Decimal;
|
|
|
use serde_json::Value;
|
|
|
use tokio::spawn;
|
|
|
@@ -16,20 +15,19 @@ use exchanges::binance_swap_ws::{BinanceSubscribeType, BinanceSwapWs, BinanceWsT
|
|
|
use exchanges::gate_swap_rest::GateSwapRest;
|
|
|
use exchanges::gate_swap_ws::{GateSubscribeType, GateSwapWs, GateWsType};
|
|
|
use exchanges::kucoin_swap_ws::{KucoinSubscribeType, KucoinSwapWs, KucoinWsType};
|
|
|
-use global::delay_time::DelayTime;
|
|
|
use standard::exchange::ExchangeEnum::{BinanceSpot, BinanceSwap, GateSwap, KucoinSwap};
|
|
|
use standard::SpecialTicker;
|
|
|
use crate::model::{OrderInfo, OriginalTicker, OriginalTradeBa, OriginalTradeGa};
|
|
|
use crate::quant::Quant;
|
|
|
|
|
|
// 交易交易所启动
|
|
|
-pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>, time_record: Arc<Mutex<DelayTime>>){
|
|
|
+pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
match exchange_name.as_str() {
|
|
|
"gate_usdt_swap" => {
|
|
|
- gate_swap_run(bool_v1, time_record, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"kucoin_usdt_swap" => {
|
|
|
- kucoin_swap_run(bool_v1, time_record, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ kucoin_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
_ => {
|
|
|
error!("交易交易所启动失败,参数错误!")
|
|
|
@@ -38,19 +36,19 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name:
|
|
|
}
|
|
|
|
|
|
// 参考交易所启动
|
|
|
-pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>, time_record: Arc<Mutex<DelayTime>>){
|
|
|
+pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
match exchange_name.as_str() {
|
|
|
"binance_usdt_swap" => {
|
|
|
- reference_binance_swap_run(bool_v1,time_record, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ reference_binance_swap_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"binance_usdt_spot" => {
|
|
|
- reference_binance_spot_run(bool_v1,time_record, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ reference_binance_spot_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"gate_usdt_swap" => {
|
|
|
- gate_swap_run(bool_v1, time_record,0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ gate_swap_run(bool_v1,0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
"kucoin_usdt_swap" => {
|
|
|
- kucoin_swap_run(bool_v1, time_record,0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
+ kucoin_swap_run(bool_v1,0i8, quant_arc, name, symbols, exchange_params).await;
|
|
|
}
|
|
|
_ => {
|
|
|
error!("参考交易所启动失败,参数错误!")
|
|
|
@@ -59,7 +57,7 @@ pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: Str
|
|
|
}
|
|
|
|
|
|
// 1交易、0参考 gate 合约 启动
|
|
|
-async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, time_record: Arc<Mutex<DelayTime>>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
let mut gate_exc = GateSwapRest::new(false, exchange_params.clone());
|
|
|
// 获取user_id
|
|
|
@@ -118,16 +116,16 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, time_record: Arc<Mutex<DelayTim
|
|
|
continue;
|
|
|
}
|
|
|
if data.channel == "futures.order_book" {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // let mut time_delay = time_record.lock().await;
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(GateSwap, data);
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
} else if data.channel == "futures.balances" {
|
|
|
@@ -137,9 +135,9 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, time_record: Arc<Mutex<DelayTim
|
|
|
quant.update_equity(account);
|
|
|
}
|
|
|
} else if data.channel == "futures.orders" {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // let mut time_delay = time_record.lock().await;
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let orders = standard::handle_info::HandleSwapInfo::handle_order(GateSwap, data.clone(), multiplier.clone());
|
|
|
let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
for order in orders.order {
|
|
|
@@ -159,11 +157,11 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, time_record: Arc<Mutex<DelayTim
|
|
|
};
|
|
|
order_infos.push(order_info);
|
|
|
}
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
- quant.update_order(order_infos, &mut time_delay);
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ quant.update_order(order_infos);
|
|
|
}
|
|
|
} else if data.channel == "futures.positions" {
|
|
|
let positions = standard::handle_info::HandleSwapInfo::handle_position(GateSwap,data, multiplier.clone());
|
|
|
@@ -201,7 +199,7 @@ async fn gate_swap_run(bool_v1 :Arc<AtomicBool>, time_record: Arc<Mutex<DelayTim
|
|
|
}
|
|
|
|
|
|
// 1交易、0参考 kucoin 合约 启动
|
|
|
-async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<DelayTime>>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, type_num: i8, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
let symbols_clone = symbols.clone();
|
|
|
let mut symbol_arr = Vec::new();
|
|
|
@@ -247,16 +245,16 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<Delay
|
|
|
continue;
|
|
|
}
|
|
|
if data.channel == "level2" {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // let mut time_delay = time_record.lock().await;
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data);
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
} else if data.channel == "tickerV2" {
|
|
|
@@ -273,9 +271,9 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<Delay
|
|
|
quant.update_equity(account);
|
|
|
}
|
|
|
} else if data.channel == "symbolOrderChange" {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // let mut time_delay = time_record.lock().await;
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
|
|
|
let mut order_infos:Vec<OrderInfo> = Vec::new();
|
|
|
for order in orders.order {
|
|
|
@@ -298,11 +296,11 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<Delay
|
|
|
};
|
|
|
order_infos.push(order_info);
|
|
|
}
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
- quant.update_order(order_infos, &mut time_delay);
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ quant.update_order(order_infos);
|
|
|
}
|
|
|
} else if data.channel == "position.change" {
|
|
|
let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
|
|
|
@@ -340,7 +338,7 @@ async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<Delay
|
|
|
}
|
|
|
|
|
|
// 参考 币安 现货 启动
|
|
|
-async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<DelayTime>>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
spawn(async move {
|
|
|
let mut ba_exc = BinanceSpotWs::new_label(name, false, exchange_params, BinanceSpotWsType::PublicAndPrivate, tx);
|
|
|
@@ -384,22 +382,22 @@ async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<
|
|
|
quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
|
|
|
}
|
|
|
} else if data.channel == "bookTicker" {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // let mut time_delay = time_record.lock().await;
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
if ticker.u > update_flag_u {
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(SpecialTicker{
|
|
|
sell: ticker.a.clone(),
|
|
|
buy: ticker.b.clone(),
|
|
|
mid_price: Default::default(),
|
|
|
}, data.label.clone());
|
|
|
let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
|
|
|
- quant._update_depth(depth.clone(), data.label.clone(), &mut time_delay);
|
|
|
+ quant._update_depth(depth.clone(), data.label.clone());
|
|
|
quant.local_depths.insert(data.label.clone(), depth);
|
|
|
}
|
|
|
} else {
|
|
|
@@ -410,16 +408,16 @@ async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<
|
|
|
let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
|
|
|
let u = v["lastUpdateId"].as_i64().unwrap();
|
|
|
if u > update_flag_u {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // let mut time_delay = time_record.lock().await;
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data);
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
|
|
|
@@ -441,7 +439,7 @@ async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<
|
|
|
|
|
|
|
|
|
// 参考 币安 合约 启动
|
|
|
-async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<Mutex<DelayTime>>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
+async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
|
|
|
let (tx, mut rx) = channel(100);
|
|
|
spawn( async move {
|
|
|
let mut ba_exc = BinanceSwapWs::new_label(name, false, exchange_params, BinanceWsType::PublicAndPrivate, tx);
|
|
|
@@ -484,22 +482,22 @@ async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<
|
|
|
quant.max_buy_min_sell_cache.insert(data.label, vec![max_buy, min_sell]);
|
|
|
}
|
|
|
} else if data.channel == "bookTicker" {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // let mut time_delay = time_record.lock().await;
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
if ticker.u > update_flag_u {
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(SpecialTicker{
|
|
|
sell: ticker.a.clone(),
|
|
|
buy: ticker.b.clone(),
|
|
|
mid_price: Default::default(),
|
|
|
}, data.label.clone());
|
|
|
let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
|
|
|
- quant._update_depth(depth.clone(), data.label.clone(), &mut time_delay);
|
|
|
+ quant._update_depth(depth.clone(), data.label.clone());
|
|
|
quant.local_depths.insert(data.label.clone(), depth);
|
|
|
}
|
|
|
} else {
|
|
|
@@ -510,16 +508,15 @@ async fn reference_binance_swap_run(bool_v1 :Arc<AtomicBool>, time_record : Arc<
|
|
|
let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
|
|
|
let u = v["u"].as_i64().unwrap();
|
|
|
if u > update_flag_u {
|
|
|
- let mut time_delay = time_record.lock().await;
|
|
|
- time_delay.network = data.time.clone();
|
|
|
- time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.network = data.time.clone();
|
|
|
+ // time_delay.format_start = Utc::now().timestamp_micros();
|
|
|
let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSwap, data);
|
|
|
- time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.format_end = Utc::now().timestamp_micros();
|
|
|
{
|
|
|
let mut quant = bot_arc_clone.lock().await;
|
|
|
- time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
+ // time_delay.quant_start = Utc::now().timestamp_micros();
|
|
|
quant._update_ticker(depth.ticker.clone(), depth.name.clone());
|
|
|
- quant._update_depth(depth.depth.clone(), depth.name.clone(), &mut time_delay);
|
|
|
+ quant._update_depth(depth.depth.clone(), depth.name.clone());
|
|
|
quant.local_depths.insert(depth.name, depth.depth);
|
|
|
}
|
|
|
} else {
|