فهرست منبع

optimize error handling for websockets and logging

Berke 1 سال پیش
والد
کامیت
6fc9781349
4فایلهای تغییر یافته به همراه104 افزوده شده و 53 حذف شده
  1. 53 21
      src/data_providers/binance/market_data.rs
  2. 36 25
      src/data_providers/bybit/market_data.rs
  3. 2 1
      src/logger.rs
  4. 13 6
      src/main.rs

+ 53 - 21
src/data_providers/binance/market_data.rs

@@ -36,7 +36,7 @@ enum State {
 #[derive(Debug, Clone)]
 pub enum Event {
     Connected(Connection),
-    Disconnected,
+    Disconnected(String),
     DepthReceived(Ticker, FeedLatency, i64, Depth, Vec<Trade>),
     KlineReceived(Ticker, Kline, Timeframe),
 }
@@ -348,7 +348,10 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                             asks: depth.asks,
                                         }
                                     },
-                                    Err(_) => return,
+                                    Err(e) => {
+                                        log::error!("Failed to fetch depth for {}, error: {}", symbol_str, e);
+                                        return;
+                                    }
                                 };
 
                                 let _ = tx.send(depth);
@@ -356,15 +359,22 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                             match rx.await {
                                 Ok(depth) => {
                                     orderbook.fetched(depth);
+
                                     state = State::Connected(websocket);
+                                    let _ = output.send(Event::Connected(Connection)).await;                                 
                                 },
-                                Err(_) => output.send(Event::Disconnected).await.expect("Trying to send disconnect event..."),
+                                Err(e) => {
+                                    let _ = output.send(Event::Disconnected(
+                                        format!("Failed to send fetched depth for {}, error: {}", symbol_str, e)
+                                    )).await.expect("Trying to send disconnect event...");
+                                }
                             }
-                            
                         } else {
-                            tokio::time::sleep(tokio::time::Duration::from_secs(1))
-                           .await;
-                           let _ = output.send(Event::Disconnected).await;
+                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+                            let _ = output.send(Event::Disconnected(
+                                "Failed to connect to websocket".to_string()
+                            )).await;
                         }
                     },
                     State::Connected(ws) => {
@@ -422,7 +432,10 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                                     asks: depth.asks,
                                                                 }
                                                             },
-                                                            Err(_) => return,
+                                                            Err(e) => {
+                                                                log::error!("Failed to fetch depth for {}, error: {}", symbol_str, e);
+                                                                return;
+                                                            }
                                                         };
     
                                                         let _ = tx.send(depth);
@@ -431,10 +444,12 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                                         Ok(depth) => {
                                                             orderbook.fetched(depth)
                                                         },
-                                                        Err(_) => {
+                                                        Err(e) => {
                                                             state = State::Disconnected;
-                                                            output.send(Event::Disconnected).await.expect("Trying to send disconnect event...");
-                                                        },
+                                                            let _ = output.send(Event::Disconnected(
+                                                                format!("Failed to send fetched depth for {}, error: {}", symbol_str, e)
+                                                            )).await.expect("Trying to send disconnect event...");
+                                                        }
                                                     }
                                                     already_fetching = false;
                                                 }
@@ -492,13 +507,18 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                     }
                                 }
                                 OpCode::Close => {
-                                    log::error!("Connection closed");
-                                    let _ = output.send(Event::Disconnected).await;
+                                    state = State::Disconnected;
+                                    let _ = output.send(
+                                        Event::Disconnected("Connection closed".to_string())
+                                    ).await;
                                 }
                                 _ => {}
                             },
-                            Err(e) => {
-                                log::error!("Error reading frame: {}", e);
+                            Err(e) => {    
+                                state = State::Disconnected;           
+                                let _ = output.send(
+                                    Event::Disconnected("Error reading frame: ".to_string() + &e.to_string())
+                                ).await;
                             }
                         };
                     }
@@ -542,11 +562,14 @@ pub fn connect_kline_stream(streams: Vec<(Ticker, Timeframe)>) -> impl Stream<It
                             domain, streams
                         )
                         .await {
-                           state = State::Connected(websocket);
+                            state = State::Connected(websocket);
+                            let _ = output.send(Event::Connected(Connection)).await;        
                         } else {
-                            tokio::time::sleep(tokio::time::Duration::from_secs(1))
-                           .await;
-                           let _ = output.send(Event::Disconnected).await;
+                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+                            let _ = output.send(Event::Disconnected(
+                                "Failed to connect to websocket".to_string()
+                            )).await;
                         }
                     },
                     State::Connected(ws) => {
@@ -575,10 +598,19 @@ pub fn connect_kline_stream(streams: Vec<(Ticker, Timeframe)>) -> impl Stream<It
                                         log::error!("\nUnknown data: {:?}", &json_bytes);
                                     }
                                 }
+                                OpCode::Close => {
+                                    state = State::Disconnected;
+                                    let _ = output.send(
+                                        Event::Disconnected("Connection closed".to_string())
+                                    ).await;
+                                }
                                 _ => {}
                             }, 
-                            Err(e) => {
-                                log::error!("Error reading frame: {}", e);
+                            Err(e) => {      
+                                state = State::Disconnected;        
+                                let _ = output.send(
+                                    Event::Disconnected("Error reading frame: ".to_string() + &e.to_string())
+                                ).await;  
                             }
                         }
                     }

+ 36 - 25
src/data_providers/bybit/market_data.rs

@@ -1,13 +1,12 @@
 use iced::{stream, futures};
 use futures::sink::SinkExt;
-use futures::stream::{Stream, StreamExt};
+use futures::stream::Stream;
 
 use serde_json::Value;
 use bytes::Bytes;
 
-use sonic_rs::{JsonValueTrait};
-use sonic_rs::{Deserialize, Serialize}; 
-use sonic_rs::{to_object_iter_unchecked};
+use sonic_rs::{JsonValueTrait, Deserialize, Serialize}; 
+use sonic_rs::to_object_iter_unchecked;
 
 use anyhow::anyhow;
 use anyhow::{Context, Result};
@@ -36,7 +35,7 @@ enum State {
 #[derive(Debug, Clone)]
 pub enum Event {
     Connected(Connection),
-    Disconnected,
+    Disconnected(String),
     DepthReceived(Ticker, FeedLatency, i64, Depth, Vec<Trade>),
     KlineReceived(Ticker, Kline, Timeframe),
 }
@@ -343,19 +342,21 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                             }).to_string();
     
                             if let Err(e) = websocket.write_frame(Frame::text(fastwebsockets::Payload::Borrowed(subscribe_message.as_bytes()))).await {
-                                log::error!("Failed subscribing: {}", e);
-
-                                let _ = output.send(Event::Disconnected).await;
+                                let _ = output.send(Event::Disconnected(
+                                    format!("Failed subscribing: {}", e)
+                                )).await;
 
                                 continue;
                             }
 
                             state = State::Connected(websocket);
-                            
+                            let _ = output.send(Event::Connected(Connection)).await; 
                         } else {
                             tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
 
-                            let _ = output.send(Event::Disconnected).await;
+                            let _ = output.send(Event::Disconnected(
+                                "Failed to connect to websocket".to_string()
+                            )).await;
                         }
                     },
                     State::Connected(websocket) => {
@@ -433,15 +434,20 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
                                             }
                                         }
                                     }
-                                },
+                                }
                                 OpCode::Close => {
-                                    log::error!("Connection closed");
-                                    let _ = output.send(Event::Disconnected).await;
-                                },
+                                    state = State::Disconnected;
+                                    let _ = output.send(
+                                        Event::Disconnected("Connection closed".to_string())
+                                    ).await;
+                                }
                                 _ => {}
                             },
                             Err(e) => {
-                                log::error!("Error reading frame: {}", e);
+                                state = State::Disconnected;        
+                                let _ = output.send(
+                                    Event::Disconnected("Error reading frame: ".to_string() + &e.to_string())
+                                ).await;
                             }
                         }
                     }
@@ -489,19 +495,21 @@ pub fn connect_kline_stream(streams: Vec<(Ticker, Timeframe)>) -> impl Stream<It
                             }).to_string();
     
                             if let Err(e) = websocket.write_frame(Frame::text(fastwebsockets::Payload::Borrowed(subscribe_message.as_bytes()))).await {
-                                log::error!("Failed subscribing: {}", e);
-
-                                let _ = output.send(Event::Disconnected).await;
+                                let _ = output.send(Event::Disconnected
+                                    (format!("Failed subscribing: {}", e))
+                                ).await;
 
                                 continue;
-                            } 
+                            }
 
                             state = State::Connected(websocket);
-                            
+                            let _ = output.send(Event::Connected(Connection)).await;
                         } else {
-                            tokio::time::sleep(tokio::time::Duration::from_secs(1))
-                           .await;
-                           let _ = output.send(Event::Disconnected).await;
+                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
+
+                            let _ = output.send(Event::Disconnected(
+                                "Failed to connect to websocket".to_string()
+                            )).await;
                         }
                     }
                     State::Connected(websocket) => {
@@ -534,8 +542,11 @@ pub fn connect_kline_stream(streams: Vec<(Ticker, Timeframe)>) -> impl Stream<It
                                 }
                                 _ => {}
                             },
-                            Err(e) => {
-                                log::error!("Error reading frame: {}", e);
+                            Err(e) => {   
+                                state = State::Disconnected;             
+                                let _ = output.send(
+                                    Event::Disconnected("Error reading frame: ".to_string() + &e.to_string())
+                                ).await;
                             }
                         }
                     }

+ 2 - 1
src/logger.rs

@@ -55,7 +55,8 @@ fn monitor_file_size(file_path: &str, max_size_bytes: u64) {
                 }
             }
             Err(err) => {
-                eprintln!("Error reading file metadata: {}", err);
+                eprintln!("Error reading log file metadata: {}", err);
+                process::exit(1);
             }
         }
         std::thread::sleep(std::time::Duration::from_secs(30));

+ 13 - 6
src/main.rs

@@ -6,6 +6,7 @@ mod style;
 mod screen;
 mod logger;
 
+use hyper::client::conn;
 use style::{ICON_FONT, ICON_BYTES, Icon};
 
 use screen::{Notification, Error};
@@ -444,6 +445,12 @@ impl State {
 
                 match event {
                     MarketEvents::Binance(event) => match event {
+                        binance::market_data::Event::Connected(connection) => {
+                            log::info!("a stream connected to Binance WS: {connection:?}");
+                        }
+                        binance::market_data::Event::Disconnected(event) => {
+                            log::info!("a stream disconnected from Binance WS: {event:?}");
+                        }
                         binance::market_data::Event::DepthReceived(ticker, feed_latency, depth_update_t, depth, trades_buffer) => {                            
                             let stream_type = StreamType::DepthAndTrades {
                                 exchange: Exchange::BinanceFutures,
@@ -479,11 +486,14 @@ impl State {
                                     .remove(&stream_type);
                             }
                         }
-                        _ => {
-                            log::warn!("{event:?}");
-                        }
                     },
                     MarketEvents::Bybit(event) => match event {
+                        bybit::market_data::Event::Connected(_) => {
+                            log::info!("a stream connected to Bybit WS");
+                        }
+                        bybit::market_data::Event::Disconnected(event) => {
+                            log::info!("a stream disconnected from Bybit WS: {event:?}");
+                        }
                         bybit::market_data::Event::DepthReceived(ticker, feed_latency, depth_update_t, depth, trades_buffer) => {
                             let stream_type = StreamType::DepthAndTrades {
                                 exchange: Exchange::BybitLinear,
@@ -519,9 +529,6 @@ impl State {
                                     .remove(&stream_type);
                             }
                         }
-                        _ => {
-                            log::warn!("{event:?}");
-                        }
                     },
                 }