Browse Source

买一卖一搜集程序改造

JiahengHe 8 tháng trước cách đây
mục cha
commit
095f5821e1
4 tập tin đã thay đổi với 266 bổ sung32 xóa
  1. 1 0
      Cargo.toml
  2. 17 10
      global/src/log_utils.rs
  3. 206 0
      src/gate_binance_data_listener.rs
  4. 42 22
      src/main.rs

+ 1 - 0
Cargo.toml

@@ -25,4 +25,5 @@ rust_decimal_macros = "1.32.0"
 futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
 futures-channel = "0.3.28"
 lazy_static = "1.4.0"
+reqwest = { version = "0.11.14", features = ["json"] }
 

+ 17 - 10
global/src/log_utils.rs

@@ -1,6 +1,7 @@
+use std::collections::HashMap;
 use std::fmt::Debug;
 use std::io;
-use tracing::{Event, Subscriber, warn};
+use tracing::{Event, info, Subscriber, warn};
 use tracing_appender_timezone::non_blocking::WorkerGuard;
 use tracing_subscriber::{fmt, Layer};
 use tracing_subscriber::layer::{Context, SubscriberExt};
@@ -23,11 +24,11 @@ impl Visit for ErrorMessageVisitor {
 
 // 错误报告发送到指定服务器
 struct ReportingLayer {
-    app_name: String,
+    account_name: String,
 }
 impl<S> Layer<S> for ReportingLayer
-    where
-        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
+where
+    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
 {
     fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
         if event.metadata().level() == &tracing::Level::ERROR {
@@ -36,11 +37,17 @@ impl<S> Layer<S> for ReportingLayer
             };
             event.record(&mut visitor);
 
-            warn!("Unhandle error: {}", self.app_name)
+            let msg = format!("account={}, type=error, msg={}", self.account_name.clone(), visitor.message);
+            info!(msg)
+            // send_remote_err_log(msg)
         }
     }
 }
 
+pub fn send_remote_err_log(_msg: String) {
+
+}
+
 pub fn init_log_with_debug() {
     let _ = final_init(tracing::Level::DEBUG.as_str(), 0, "test".to_string());
 }
@@ -53,9 +60,10 @@ pub fn init_log_with_info() {
     let _ = final_init(tracing::Level::INFO.as_str(), 0, "test".to_string());
 }
 
-pub fn final_init(level: &str, _port: u32, app_name: String) -> WorkerGuard {
+pub fn final_init(level: &str, port: u32, account_name: String) -> WorkerGuard {
     let mut path = String::new();
     path.push_str("./logs");
+    path.push_str(port.to_string().as_str());
 
     let file_appender = RollingFileAppender::builder()
         .time_zone(8)
@@ -74,15 +82,14 @@ pub fn final_init(level: &str, _port: u32, app_name: String) -> WorkerGuard {
 
     let fmt_layer = fmt::layer()
         .with_timer(local_time.clone())
-        .with_line_number(true)
         .with_target(true)
+        .with_line_number(true)
         .with_level(true)
         .with_writer(io::stdout)
         .with_span_events(fmt::format::FmtSpan::FULL);
 
     let file_layer = fmt::layer()
         .with_timer(local_time.clone())
-        .with_line_number(true)
         .with_target(true)
         .with_ansi(false)
         .with_level(true)
@@ -90,7 +97,7 @@ pub fn final_init(level: &str, _port: u32, app_name: String) -> WorkerGuard {
         .with_span_events(fmt::format::FmtSpan::FULL);
 
     let reporting_layer = ReportingLayer {
-        app_name
+        account_name
     };
 
     let layer = tracing_subscriber::Registry::default()
@@ -101,5 +108,5 @@ pub fn final_init(level: &str, _port: u32, app_name: String) -> WorkerGuard {
 
     tracing::subscriber::set_global_default(layer).unwrap();
 
-    return guard;
+    guard
 }

+ 206 - 0
src/gate_binance_data_listener.rs

@@ -0,0 +1,206 @@
+use std::collections::{BTreeMap, HashMap, HashSet};
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use exchanges::binance_swap_rest::BinanceSwapRest;
+use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
+use exchanges::gate_swap_rest::GateSwapRest;
+use exchanges::gate_swap_ws::{GateSwapSubscribeType, GateSwapWs, GateSwapWsType};
+use exchanges::response_base::ResponseData;
+use serde::Serialize;
+use tokio::sync::Mutex;
+use tracing::info;
+
+
+#[derive(Debug, Clone, Serialize)]
+pub struct A1B1Info {
+    // 交易所 binance
+    pub source: String,
+    // 合约还是现货 swap spot
+    pub b_type: String,
+    // 币对
+    pub coin: String,
+    // 时间戳 秒级
+    pub time: i64,
+    // 买1价
+    pub bid: String,
+    // 卖1价
+    pub ask: String
+}
+
+pub async fn run_listener(is_shutdown_arc: Arc<AtomicBool>, data_arc: Arc<Mutex<HashMap<String, A1B1Info>>>) {
+    // 获取两个交易所共有的币种
+    let binance_symbols = get_binance_swap_symbols().await;
+    let gate_symbols = get_gate_swap_symbols().await;
+    let set1: HashSet<_> = binance_symbols.into_iter().collect();
+    let set2: HashSet<_> = gate_symbols.into_iter().collect();
+    // 获取两个交易所共有的币种
+    let symbols: Vec<String> = set1.intersection(&set2).cloned().collect();
+
+    // 币安ws订阅启动
+    for chunk in symbols.chunks(20) {
+        let ws_name = "binance_usdt_swap_listener1".to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+        let data_arc_binance_clone_for = Arc::clone(&data_arc);
+
+        tokio::spawn(async move {
+            let mut ws = BinanceSwapWs::new_with_tag(ws_name, false, None, BinanceSwapWsType::PublicAndPrivate);
+            ws.set_subscribe(vec![
+                BinanceSwapSubscribeType::PuBookTicker
+            ]);
+
+            let data_arc_binance_clone = Arc::clone(&data_arc_binance_clone_for);
+
+            let fun = move |data: ResponseData| {
+                let data_arc_cc = data_arc_binance_clone.clone();
+                async move {
+                    binance_data_listener(data,data_arc_cc).await
+                }
+            };
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, fun, &write_tx_am, write_rx).await.expect("binance链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+
+    // gate ws订阅启动
+    for chunk in symbols.chunks(20) {
+        let ws_name = "gate_usdt_swap_listener1".to_string();
+        let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+        let write_tx_am = Arc::new(Mutex::new(write_tx));
+        let symbols_chunk = chunk.iter().cloned().collect::<Vec<String>>();
+        let is_shutdown_clone = Arc::clone(&is_shutdown_arc);
+        let data_arc_gate_clone_for = Arc::clone(&data_arc);
+
+        tokio::spawn(async move {
+            let mut ws = GateSwapWs::new_with_tag(ws_name, false, None, GateSwapWsType::PublicAndPrivate("usdt".to_string()));
+            ws.set_subscribe(vec![
+                GateSwapSubscribeType::PuFuturesBookTicker
+            ]);
+            let data_arc_gate_clone = Arc::clone(&data_arc_gate_clone_for);
+
+            let fun = move |data: ResponseData| {
+                let data_arc_cc = data_arc_gate_clone.clone();
+                async move {
+                    gate_data_listener(data,data_arc_cc).await
+                }
+            };
+
+            // 建立链接
+            ws.set_symbols(symbols_chunk);
+            ws.ws_connect_async(is_shutdown_clone, fun, &write_tx_am, write_rx).await.expect("gate链接失败(内部一个心跳线程应该已经关闭了)");
+        });
+    }
+}
+
+pub async fn get_binance_swap_symbols() -> Vec<String> {
+    // 订阅所有币种
+    let login = BTreeMap::new();
+    let mut binance_rest = BinanceSwapRest::new(false, login);
+    let response = binance_rest.get_exchange_info().await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let data = response.data["symbols"].as_array().unwrap();
+        for info in data {
+            let s = info["symbol"].as_str().unwrap().to_string();
+            if !s.ends_with("USDT") {
+                continue
+            }
+            let symbol = s.replace("USDT", "_USDT");
+            symbols.push(symbol)
+        }
+    }
+    // XXX_USDT
+    symbols
+}
+
+pub async fn get_gate_swap_symbols() -> Vec<String> {
+    // 币种
+    let login = BTreeMap::new();
+    let mut gate_rest = GateSwapRest::new(false, login);
+    let response = gate_rest.get_market_details("usdt".to_string()).await;
+    let mut symbols = vec![];
+    if response.code == 200 {
+        let symbol_infos = response.data.as_array().unwrap();
+        for symbol_info in symbol_infos {
+            let symbol = symbol_info["name"].as_str().unwrap().to_string();
+            symbols.push(symbol)
+        }
+    }
+    // XXX_USDT
+    symbols
+}
+
+async fn binance_data_listener(response: ResponseData, data_arc_cc: Arc<Mutex<HashMap<String, A1B1Info>>>) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        "bookTicker" => {
+            // 买一卖一数据
+            let a1b1_info = binance_handle_book_ticker(&response);
+            data_arc_cc.lock().await.insert(format!("{}{}{}", a1b1_info.source, a1b1_info.coin, a1b1_info.time), a1b1_info);
+        },
+        _ => {
+            info!("150 binance未知的数据类型: {:?}", response)
+        }
+    }
+}
+
+// 读取数据
+async fn gate_data_listener(response: ResponseData, data_arc_cc: Arc<Mutex<HashMap<String, A1B1Info>>>) {
+    if response.code != 200 {
+        return;
+    }
+
+    match response.channel.as_str() {
+        // 深度数据
+        "futures.book_ticker" => {
+            let a1b1_info = gate_handle_book_ticker(&response);
+            data_arc_cc.lock().await.insert(format!("{}{}{}", a1b1_info.source, a1b1_info.coin, a1b1_info.time), a1b1_info);
+        }
+        _ => {
+            info!("168 gate未知的数据类型: {:?}", response)
+        }
+    }
+}
+
+fn gate_handle_book_ticker(res_data: &ResponseData) -> A1B1Info {
+    // 获取秒级
+    let create_at = &res_data.data["t"].as_i64().unwrap() / 1000;
+    let symbol = res_data.data["s"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+    // 取卖一
+    let ap = res_data.data["a"].as_str().unwrap().to_string();
+    // 取买一
+    let bp = res_data.data["b"].as_str().unwrap().to_string();
+
+    A1B1Info {
+        source: "gate".to_string(),
+        b_type: "swap".to_string(),
+        time: create_at,
+        bid: bp.to_string(),
+        coin: symbol,
+        ask: ap.to_string(),
+    }
+}
+
+
+fn binance_handle_book_ticker(res_data: &ResponseData) -> A1B1Info {
+    let bp = (*res_data).data["b"].as_str().unwrap();
+    let ap = (*res_data).data["a"].as_str().unwrap();
+    // 获取秒级
+    let create_at = (*res_data).data["E"].as_i64().unwrap()/1000;
+    let symbol = (*res_data).data["s"].as_str().unwrap().to_string().replace("USDT", "_USDT");
+    A1B1Info {
+        source: "binance".to_string(),
+        b_type: "swap".to_string(),
+        time: create_at,
+        bid: bp.to_string(),
+        coin: symbol,
+        ask: ap.to_string(),
+    }
+}

+ 42 - 22
src/main.rs

@@ -19,12 +19,18 @@ mod cointr_usdt_swap_data_listener;
 mod gate_usdt_spot_data_listener;
 mod gate_coin_spot_data_listener;
 mod bybit_usdt_swap_data_listener;
+mod gate_binance_data_listener;
 
+use std::collections::HashMap;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
+use tokio::sync::Mutex;
+use reqwest::{Client, Error, Response};
+use serde_json::json;
 use tracing::{info, warn};
 use tracing_appender_timezone::non_blocking::WorkerGuard;
+use crate::gate_binance_data_listener::A1B1Info;
 
 // 日志级别配置
 fn log_level_init(log_str: String, port: u32, app_name: String) -> WorkerGuard {
@@ -35,38 +41,52 @@ fn log_level_init(log_str: String, port: u32, app_name: String) -> WorkerGuard {
 #[tokio::main(flavor = "multi_thread")]
 async fn main() {
     // 日志级别配置
-    let _ = log_level_init("info".to_string(), 8888, "data-center".to_string());
+    let _ = log_level_init("info".to_string(), 8888, "a1_b1_data-center".to_string());
     // 掌控全局的关闭
     let running = Arc::new(AtomicBool::new(true));
-    // 初始化数据服务器
-    server::run_server(8888, running.clone());
+
     // ctrl c退出检查程序
     control_c::exit_handler(running.clone());
+    // 交易所买一卖一收集 key:(交易所,币对,时间戳(秒)), value:数据结构为:A1B1Info
+    let data_map: HashMap<String, A1B1Info> = HashMap::new();
+    let data_arc = Arc::new(Mutex::new(data_map));
+    info!("初始化完成!");
     // 启动各交易所的数据监听器
-    // okx_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // kucoin_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // bitmart_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // bingx_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // coinsph_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // woo_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // cointr_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // htx_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // gate_usdt_spot_data_listener::run_listener(running.clone()).await;
+    gate_binance_data_listener::run_listener(running.clone(), data_arc.clone()).await;
+
+    tokio::spawn(async move {
+        loop {
+            tokio::time::sleep(Duration::from_secs(1)).await;
+            let send_data;
+            let mut data = data_arc.lock().await;
+            {
+                // 克隆数据,清空原容器
+                send_data = data.clone();
+                data.clear();
+            }
+            let result: Vec<A1B1Info> = send_data.into_iter().map(|(_, v)| v).collect();
+
+
+            let res = Client::new().post("http://localhost:82/priceCollect/api/add")
+                .header("auth", "4L")
+                .header("Content-Type", "application/json")
+                .body(json!(result).to_string())
+                .send()
+                .await;
+
+            match res {
+                Err(err) => {
+                    warn!("log的error监听器发送远端报错失败:{:?}", err);
+                }
+                _ => {}
+            }
+        }
+    });
 
-    binance_usdt_swap_data_listener::run_listener(running.clone()).await;
-    gate_usdt_swap_data_listener::run_listener(running.clone()).await;
-    coinex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // phemex_usdt_swap_data_listener::run_listener(running.clone()).await;
-    // gate_coin_spot_data_listener::run_listener(running.clone()).await;
-    // mexc_usdt_swap_data_listener::run_listener(running.clone()).await;
-    bybit_usdt_swap_data_listener::run_listener(running.clone()).await;
-    bitget_usdt_swap_data_listener::run_listener(running.clone()).await;
     // panic错误捕获,panic级别的错误直接退出
-    // let panic_running = running.clone();
     std::panic::set_hook(Box::new(move |panic_info| {
         let msg = format!("type=panic, msg={:?}, location={:?}", panic_info.to_string(), panic_info.location());
         warn!("{}", msg);
-        // panic_running.store(false, Ordering::Relaxed);
     }));
     // 每一秒检查一次程序是否结束
     while running.load(Ordering::Relaxed) {