Quellcode durchsuchen

支持币安现货参考

skyfffire vor 1 Jahr
Ursprung
Commit
4d90b929ed
2 geänderte Dateien mit 91 neuen und 51 gelöschten Zeilen
  1. 65 38
      strategy/src/binance_spot.rs
  2. 26 13
      strategy/src/exchange_disguise.rs

+ 65 - 38
strategy/src/binance_spot.rs

@@ -1,18 +1,64 @@
 use std::collections::BTreeMap;
 use std::sync::Arc;
 use std::sync::atomic::AtomicBool;
+use futures_util::StreamExt;
 use rust_decimal::Decimal;
-use serde_json::Value;
+use tokio::spawn;
 use tokio::sync::Mutex;
+use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
 use exchanges::response_base::ResponseData;
 use global::trace_stack::TraceStack;
 use standard::exchange::ExchangeEnum::BinanceSpot;
-use crate::model::{OriginalTicker, OriginalTradeBa};
+use crate::exchange_disguise::on_special_depth;
+use crate::model::{OriginalTradeBa};
 use crate::quant::Quant;
 
 // 参考 币安 现货 启动
-#[allow(dead_code)]
-pub async fn reference_binance_spot_run(_bool_v1 :Arc<AtomicBool>, _quant_arc: Arc<Mutex<Quant>>, _name: String, _symbols: Vec<String>, _exchange_params: BTreeMap<String, String>){
+pub async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>,
+                                        quant_arc: Arc<Mutex<Quant>>,
+                                        name: String,
+                                        symbols: Vec<String>,
+                                        is_colo: bool,
+                                        _exchange_params: BTreeMap<String, String>) {
+    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+
+    let mut ws = BinanceSpotWs::new_label(name.clone(), is_colo, None, BinanceSpotWsType::PublicAndPrivate);
+    ws.set_symbols(symbols.clone());
+    ws.set_subscribe(vec![
+        // TODO 此处应该使用BookTicker,等标准层格式化完成。
+        // BinanceSpotSubscribeType::PuBookTicker,
+        BinanceSpotSubscribeType::PuDepth20levels100ms
+    ]);
+
+    // 开启数据读取线程
+    let write_tx_am = Arc::new(Mutex::new(write_tx));
+    let bot_arc_clone = quant_arc.clone();
+
+    spawn(async move {
+        //链接
+        let bool_v3_clone = Arc::clone(&bool_v1);
+        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+    });
+
+    spawn(async move {
+        // trade
+        let mut max_buy = Decimal::ZERO;
+        let mut min_sell = Decimal::ZERO;
+        // ticker
+        let mut update_flag_u = Decimal::ZERO;
+
+        loop {
+            if let Some(data) = read_rx.next().await {
+                on_data(bot_arc_clone.clone(),
+                        &mut update_flag_u,
+                        &mut max_buy,
+                        &mut min_sell,
+                        data).await;
+            }
+        }
+    });
+
     // let (tx, mut rx) = channel(100);
     // spawn(async move {
     //     let mut ba_exc = BinanceSpotWs::new_label(name, false, None, BinanceSpotWsType::PublicAndPrivate);
@@ -45,8 +91,11 @@ pub async fn reference_binance_spot_run(_bool_v1 :Arc<AtomicBool>, _quant_arc: A
     // });
 }
 
-#[allow(dead_code)]
-async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>,
+                 update_flag_u: &mut Decimal,
+                 max_buy: &mut Decimal,
+                 min_sell: &mut Decimal,
+                 data: ResponseData) {
     let mut trace_stack = TraceStack::default();
     trace_stack.on_after_network(data.time);
     trace_stack.on_before_quant();
@@ -74,39 +123,17 @@ async fn on_data(bot_arc_clone: Arc<Mutex<Quant>>, update_flag_u: &mut i64, max_
         }
     } else if data.channel == "bookTicker" {
         trace_stack.on_before_format();
-        let ticker: OriginalTicker = serde_json::from_str(data.data.as_str()).unwrap();
-        if ticker.u > *update_flag_u {
-            // {
-            //     let mut quant = bot_arc_clone.lock().await;
-            //     // time_delay.quant_start = Utc::now().timestamp_micros();
-            //     quant._update_ticker(SpecialTicker{
-            //         sell: ticker.a.clone(),
-            //         buy: ticker.b.clone(),
-            //         mid_price: Default::default(),
-            //     }, data.label.clone());
-            //     let depth = vec![ticker.b, ticker.B, ticker.a, ticker.A];
-            //     trace_stack.on_after_format();
-            //     quant._update_depth(depth.clone(), data.label.clone(), trace_stack.clone());
-            //     quant.local_depths.insert(data.label.clone(), depth);
-            // }
-        } else {
-            *update_flag_u = ticker.u;
-        }
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(BinanceSpot, data.clone());
+        trace_stack.on_after_format();
+        trace_stack.on_before_network(special_depth.create_at);
+
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
     } else if data.channel == "depth" {
         trace_stack.on_before_format();
-        let v: Value = serde_json::from_str(data.data.clone().as_str()).unwrap();
-        let u = v["lastUpdateId"].as_i64().unwrap();
-        if u > *update_flag_u {
-            let depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data);
-            trace_stack.on_after_format();
-            {
-                let mut quant = bot_arc_clone.lock().await;
-                quant._update_ticker(depth.ticker.clone(), depth.name.clone());
-                quant._update_depth(depth.depth.clone(), depth.name.clone(), trace_stack.clone());
-                quant.local_depths.insert(depth.name, depth.depth);
-            }
-        } else {
-            *update_flag_u = u;
-        }
+        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data.clone());
+        trace_stack.on_after_format();
+        trace_stack.on_before_network(special_depth.create_at);
+
+        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
     }
 }

+ 26 - 13
strategy/src/exchange_disguise.rs

@@ -5,6 +5,7 @@ use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
+use crate::binance_spot::reference_binance_spot_run;
 use crate::binance_usdt_swap::reference_binance_swap_run;
 use crate::bitget_spot::bitget_spot_run;
 use crate::gate_swap::gate_swap_run;
@@ -13,16 +14,22 @@ use crate::kucoin_swap::kucoin_swap_run;
 use crate::quant::Quant;
 
 // 交易交易所启动
-pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>,
+                                        exchange_name: String,
+                                        quant_arc: Arc<Mutex<Quant>>,
+                                        name: String,
+                                        symbols: Vec<String>,
+                                        is_colo: bool,
+                                        exchange_params: BTreeMap<String, String>) {
     match exchange_name.as_str() {
         "gate_usdt_swap" => {
-            gate_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
+            gate_swap_run(bool_v1, true, quant_arc, name, symbols, is_colo, exchange_params).await;
         }
         "kucoin_usdt_swap" => {
-            kucoin_swap_run(bool_v1, 1i8, quant_arc, name, symbols, exchange_params).await;
+            kucoin_swap_run(bool_v1, true, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
         "bitget_spot" => {
-            bitget_spot_run(bool_v1,1i8, quant_arc, name, symbols, exchange_params).await;
+            bitget_spot_run(bool_v1,false, quant_arc, name, symbols, is_colo, exchange_params).await;
         }
         _ => {
             let msg = format!("不支持的交易交易所:{}", exchange_name);
@@ -32,25 +39,31 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>, exchange_name:
 }
 
 // 参考交易所启动
-pub async fn run_reference_exchange(bool_v1 :Arc<AtomicBool>, exchange_name: String, quant_arc: Arc<Mutex<Quant>>, name: String, symbols: Vec<String>, exchange_params: BTreeMap<String, String>){
+pub async fn run_reference_exchange(bool_v1: Arc<AtomicBool>,
+                                    exchange_name: String,
+                                    quant_arc: Arc<Mutex<Quant>>,
+                                    name: String,
+                                    symbols: Vec<String>,
+                                    is_colo: bool,
+                                    exchange_params: BTreeMap<String, String>) {
     match exchange_name.as_str() {
         "binance_usdt_swap" => {
-            reference_binance_swap_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
+            reference_binance_swap_run(bool_v1, quant_arc, name, symbols, is_colo, exchange_params).await;
+        },
+        "binance_spot" => {
+            reference_binance_spot_run(bool_v1, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
-        // "binance_spot" => {
-        //     reference_binance_spot_run(bool_v1, quant_arc, name, symbols, exchange_params).await;
-        // },
         "gate_usdt_swap" => {
-            gate_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+            gate_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
         "kucoin_usdt_swap" => {
-            kucoin_swap_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+            kucoin_swap_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
         "kucoin_spot" => {
-            kucoin_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+            kucoin_spot_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
         "bitget_spot" => {
-            bitget_spot_run(bool_v1, 0i8, quant_arc, name, symbols, exchange_params).await;
+            bitget_spot_run(bool_v1, false, quant_arc, name, symbols, is_colo, exchange_params).await;
         },
         _ => {
             let msg = format!("不支持的参考交易所:{}", exchange_name);