|
|
@@ -1,3 +1,4 @@
|
|
|
+use tracing::warn;
|
|
|
use crate::data_providers::deserialize_string_to_f32;
|
|
|
use crate::data_providers::deserialize_string_to_i64;
|
|
|
use std::collections::HashMap;
|
|
|
@@ -28,6 +29,8 @@ use crate::{Ticker, Timeframe};
|
|
|
use super::str_f32_parse;
|
|
|
use super::OpenInterest;
|
|
|
|
|
|
+use tracing::{error};
|
|
|
+
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
|
struct SonicDepth {
|
|
|
#[serde(rename = "u")]
|
|
|
@@ -58,29 +61,29 @@ struct SonicTrade {
|
|
|
pub is_sell: String,
|
|
|
}
|
|
|
|
|
|
-#[derive(Deserialize, Debug, Clone)]
|
|
|
-pub struct SonicKline {
|
|
|
- #[serde(rename = "start")]
|
|
|
- pub time: u64,
|
|
|
- #[serde(rename = "open")]
|
|
|
- pub open: String,
|
|
|
- #[serde(rename = "high")]
|
|
|
- pub high: String,
|
|
|
- #[serde(rename = "low")]
|
|
|
- pub low: String,
|
|
|
- #[serde(rename = "close")]
|
|
|
- pub close: String,
|
|
|
- #[serde(rename = "volume")]
|
|
|
- pub volume: String,
|
|
|
- #[serde(rename = "interval")]
|
|
|
- pub interval: String,
|
|
|
-}
|
|
|
+// #[derive(Deserialize, Debug, Clone)]
|
|
|
+// pub struct SonicKline {
|
|
|
+// #[serde(rename = "start")]
|
|
|
+// pub time: u64,
|
|
|
+// #[serde(rename = "open")]
|
|
|
+// pub open: String,
|
|
|
+// #[serde(rename = "high")]
|
|
|
+// pub high: String,
|
|
|
+// #[serde(rename = "low")]
|
|
|
+// pub low: String,
|
|
|
+// #[serde(rename = "close")]
|
|
|
+// pub close: String,
|
|
|
+// #[serde(rename = "volume")]
|
|
|
+// pub volume: String,
|
|
|
+// #[serde(rename = "interval")]
|
|
|
+// pub interval: String,
|
|
|
+// }
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
enum StreamData {
|
|
|
Trade(Vec<SonicTrade>),
|
|
|
Depth(SonicDepth, String, i64),
|
|
|
- Kline(Ticker, Vec<SonicKline>),
|
|
|
+ // Kline(Ticker, Vec<SonicKline>),
|
|
|
}
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
@@ -126,7 +129,7 @@ fn feed_de(
|
|
|
let mut depth_wrap: Option<SonicDepth> = None;
|
|
|
|
|
|
let mut data_type = String::new();
|
|
|
- let mut topic_ticker = Ticker::default();
|
|
|
+ // let mut topic_ticker = Ticker::default();
|
|
|
|
|
|
let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(slice) };
|
|
|
|
|
|
@@ -142,23 +145,23 @@ fn feed_de(
|
|
|
}
|
|
|
|
|
|
match StreamName::from_topic(val, is_ticker, market_type) {
|
|
|
- StreamName::Depth(ticker) => {
|
|
|
+ StreamName::Depth(_ticker) => {
|
|
|
stream_type = Some(StreamWrapper::Depth);
|
|
|
|
|
|
- topic_ticker = ticker;
|
|
|
+ // topic_ticker = ticker;
|
|
|
}
|
|
|
- StreamName::Trade(ticker) => {
|
|
|
+ StreamName::Trade(_ticker) => {
|
|
|
stream_type = Some(StreamWrapper::Trade);
|
|
|
|
|
|
- topic_ticker = ticker;
|
|
|
+ // topic_ticker = ticker;
|
|
|
}
|
|
|
- StreamName::Kline(ticker) => {
|
|
|
+ StreamName::Kline(_ticker) => {
|
|
|
stream_type = Some(StreamWrapper::Kline);
|
|
|
|
|
|
- topic_ticker = ticker;
|
|
|
+ // topic_ticker = ticker;
|
|
|
}
|
|
|
_ => {
|
|
|
- log::error!("Unknown stream name");
|
|
|
+ error!("Unknown stream name");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -185,14 +188,14 @@ fn feed_de(
|
|
|
.map_err(|e| StreamError::ParseError(e.to_string()))?,
|
|
|
);
|
|
|
}
|
|
|
- Some(StreamWrapper::Kline) => {
|
|
|
- let kline_wrap: Vec<SonicKline> = sonic_rs::from_str(&v.as_raw_faststr())
|
|
|
- .map_err(|e| StreamError::ParseError(e.to_string()))?;
|
|
|
-
|
|
|
- return Ok(StreamData::Kline(topic_ticker, kline_wrap));
|
|
|
- }
|
|
|
+ // Some(StreamWrapper::Kline) => {
|
|
|
+ // let kline_wrap: Vec<SonicKline> = sonic_rs::from_str(&v.as_raw_faststr())
|
|
|
+ // .map_err(|e| StreamError::ParseError(e.to_string()))?;
|
|
|
+ //
|
|
|
+ // return Ok(StreamData::Kline(topic_ticker, kline_wrap));
|
|
|
+ // }
|
|
|
_ => {
|
|
|
- log::error!("Unknown stream type");
|
|
|
+ error!("Unknown stream type");
|
|
|
}
|
|
|
}
|
|
|
} else if k == "cts" {
|
|
|
@@ -210,27 +213,19 @@ fn feed_de(
|
|
|
}
|
|
|
|
|
|
async fn connect(
|
|
|
- domain: &str,
|
|
|
- market_type: MarketType
|
|
|
+ domain: &str,
|
|
|
) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
|
|
|
let tcp_stream = setup_tcp_connection(domain).await?;
|
|
|
let tls_stream = setup_tls_connection(domain, tcp_stream).await?;
|
|
|
- let url = format!(
|
|
|
- "wss://stream.bybit.com/v5/public/{}",
|
|
|
- match market_type {
|
|
|
- MarketType::Spot => "spot",
|
|
|
- MarketType::LinearPerps => "linear",
|
|
|
- }
|
|
|
- );
|
|
|
+ let url = "ws://127.0.0.1:6789";
|
|
|
setup_websocket_connection(domain, tls_stream, &url).await
|
|
|
}
|
|
|
|
|
|
async fn try_connect(
|
|
|
streams: &Value,
|
|
|
- market_type: MarketType,
|
|
|
output: &mut futures::channel::mpsc::Sender<Event>,
|
|
|
) -> State {
|
|
|
- match connect("stream.bybit.com", market_type).await {
|
|
|
+ match connect("127.0.0.1:6789").await {
|
|
|
Ok(mut websocket) => {
|
|
|
if let Err(e) = websocket
|
|
|
.write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
|
|
|
@@ -274,7 +269,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
|
|
|
symbol_str,
|
|
|
);
|
|
|
|
|
|
- let subscribe_message = serde_json::json!({
|
|
|
+ let subscribe_message = json!({
|
|
|
"op": "subscribe",
|
|
|
"args": [stream_1, stream_2]
|
|
|
});
|
|
|
@@ -286,8 +281,7 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
|
|
|
match &mut state {
|
|
|
State::Disconnected => {
|
|
|
state = try_connect(
|
|
|
- &subscribe_message,
|
|
|
- market_type,
|
|
|
+ &subscribe_message,
|
|
|
&mut output
|
|
|
).await;
|
|
|
}
|
|
|
@@ -347,9 +341,6 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
|
|
|
.await;
|
|
|
}
|
|
|
}
|
|
|
- _ => {
|
|
|
- log::warn!("Unknown data: {:?}", &data);
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -375,94 +366,12 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-pub fn connect_kline_stream(
|
|
|
- streams: Vec<(Ticker, Timeframe)>,
|
|
|
- market_type: MarketType
|
|
|
-) -> impl Stream<Item = Event> {
|
|
|
- stream::channel(100, move |mut output| async move {
|
|
|
- let mut state = State::Disconnected;
|
|
|
-
|
|
|
- let stream_str = streams
|
|
|
- .iter()
|
|
|
- .map(|(ticker, timeframe)| {
|
|
|
- let timeframe_str = timeframe.to_minutes().to_string();
|
|
|
- format!("kline.{timeframe_str}.{}", ticker.get_string().0)
|
|
|
- })
|
|
|
- .collect::<Vec<String>>();
|
|
|
-
|
|
|
- let subscribe_message = serde_json::json!({
|
|
|
- "op": "subscribe",
|
|
|
- "args": stream_str
|
|
|
- });
|
|
|
-
|
|
|
- loop {
|
|
|
- match &mut state {
|
|
|
- State::Disconnected => {
|
|
|
- state = try_connect(
|
|
|
- &subscribe_message,
|
|
|
- market_type,
|
|
|
- &mut output
|
|
|
- ).await;
|
|
|
- }
|
|
|
- State::Connected(websocket) => match websocket.read_frame().await {
|
|
|
- Ok(msg) => match msg.opcode {
|
|
|
- OpCode::Text => {
|
|
|
- if let Ok(StreamData::Kline(ticker, de_kline_vec)) =
|
|
|
- feed_de(&msg.payload[..], None, market_type)
|
|
|
- {
|
|
|
- for de_kline in &de_kline_vec {
|
|
|
- let kline = Kline {
|
|
|
- time: de_kline.time,
|
|
|
- open: str_f32_parse(&de_kline.open),
|
|
|
- high: str_f32_parse(&de_kline.high),
|
|
|
- low: str_f32_parse(&de_kline.low),
|
|
|
- close: str_f32_parse(&de_kline.close),
|
|
|
- volume: (-1.0, str_f32_parse(&de_kline.volume)),
|
|
|
- };
|
|
|
-
|
|
|
- if let Some(timeframe) = string_to_timeframe(&de_kline.interval)
|
|
|
- {
|
|
|
- let _ = output
|
|
|
- .send(Event::KlineReceived(ticker, kline, timeframe))
|
|
|
- .await;
|
|
|
- } else {
|
|
|
- log::error!(
|
|
|
- "Failed to find timeframe: {}, {:?}",
|
|
|
- &de_kline.interval,
|
|
|
- streams
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- OpCode::Close => {
|
|
|
- state = State::Disconnected;
|
|
|
- let _ = output
|
|
|
- .send(Event::Disconnected("Connection closed".to_string()))
|
|
|
- .await;
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- }
|
|
|
- Err(e) => {
|
|
|
- state = State::Disconnected;
|
|
|
- let _ = output
|
|
|
- .send(Event::Disconnected(
|
|
|
- "Error reading frame: ".to_string() + &e.to_string(),
|
|
|
- ))
|
|
|
- .await;
|
|
|
- }
|
|
|
- },
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
-}
|
|
|
-
|
|
|
-fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
|
|
|
- Timeframe::ALL
|
|
|
- .iter()
|
|
|
- .find(|&tf| tf.to_minutes().to_string() == interval)
|
|
|
- .copied()
|
|
|
-}
|
|
|
+// fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
|
|
|
+// Timeframe::ALL
|
|
|
+// .iter()
|
|
|
+// .find(|&tf| tf.to_minutes().to_string() == interval)
|
|
|
+// .copied()
|
|
|
+// }
|
|
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
@@ -488,7 +397,7 @@ pub async fn fetch_historical_oi(
|
|
|
Timeframe::H4 => "4h",
|
|
|
_ => {
|
|
|
let err_msg = format!("Unsupported timeframe for open interest: {}", period);
|
|
|
- log::error!("{}", err_msg);
|
|
|
+ error!("{}", err_msg);
|
|
|
return Err(StreamError::UnknownError(err_msg));
|
|
|
}
|
|
|
};
|
|
|
@@ -516,33 +425,33 @@ pub async fn fetch_historical_oi(
|
|
|
let response = reqwest::get(&url)
|
|
|
.await
|
|
|
.map_err(|e| {
|
|
|
- log::error!("Failed to fetch from {}: {}", url, e);
|
|
|
+ error!("Failed to fetch from {}: {}", url, e);
|
|
|
StreamError::FetchError(e)
|
|
|
})?;
|
|
|
|
|
|
let text = response.text()
|
|
|
.await
|
|
|
.map_err(|e| {
|
|
|
- log::error!("Failed to get response text from {}: {}", url, e);
|
|
|
+ error!("Failed to get response text from {}: {}", url, e);
|
|
|
StreamError::FetchError(e)
|
|
|
})?;
|
|
|
|
|
|
let content: Value = sonic_rs::from_str(&text)
|
|
|
.map_err(|e| {
|
|
|
- log::error!("Failed to parse JSON from {}: {}\nResponse: {}", url, e, text);
|
|
|
+ error!("Failed to parse JSON from {}: {}\nResponse: {}", url, e, text);
|
|
|
StreamError::ParseError(e.to_string())
|
|
|
})?;
|
|
|
|
|
|
let result_list = content["result"]["list"]
|
|
|
.as_array()
|
|
|
.ok_or_else(|| {
|
|
|
- log::error!("Result list is not an array in response: {}", text);
|
|
|
+ error!("Result list is not an array in response: {}", text);
|
|
|
StreamError::ParseError("Result list is not an array".to_string())
|
|
|
})?;
|
|
|
|
|
|
let bybit_oi: Vec<DeOpenInterest> = serde_json::from_value(json!(result_list))
|
|
|
.map_err(|e| {
|
|
|
- log::error!("Failed to parse open interest array: {}\nResponse: {}", e, text);
|
|
|
+ error!("Failed to parse open interest array: {}\nResponse: {}", e, text);
|
|
|
StreamError::ParseError(format!("Failed to parse open interest: {e}"))
|
|
|
})?;
|
|
|
|
|
|
@@ -555,7 +464,7 @@ pub async fn fetch_historical_oi(
|
|
|
.collect();
|
|
|
|
|
|
if open_interest.is_empty() {
|
|
|
- log::warn!("No open interest data found for {}, from url: {}", ticker_str, url);
|
|
|
+ warn!("No open interest data found for {}, from url: {}", ticker_str, url);
|
|
|
}
|
|
|
|
|
|
Ok(open_interest)
|