use tracing::{info, warn}; use crate::data_providers::{deserialize_string_to_f32, SpawnExecutor}; use crate::data_providers::deserialize_string_to_i64; use std::collections::HashMap; use std::vec; use bytes::Bytes; use iced::{ stream, futures::{sink::SinkExt, Stream}, }; use regex::Regex; use serde_json::json; use serde_json::Value; use sonic_rs::{Deserialize, JsonValueTrait, Serialize}; use sonic_rs::to_object_iter_unchecked; use fastwebsockets::{Frame, FragmentCollector, OpCode}; use http_body_util::Empty; use hyper::header::{CONNECTION, UPGRADE}; use hyper::Request; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; use tokio::net::TcpStream; use crate::data_providers::{ Connection, Event, Kline, LocalDepthCache, MarketType, Order, State, StreamError, TickerInfo, TickerStats, Trade, VecLocalDepthCache, }; use crate::{Ticker, Timeframe}; use super::str_f32_parse; use super::OpenInterest; use tracing::{error}; #[derive(Serialize, Deserialize, Debug)] struct SonicDepth { #[serde(rename = "time")] pub update_id: u64, #[serde(rename = "bids")] pub bids: Vec, #[serde(rename = "asks")] pub asks: Vec, } #[derive(Serialize, Deserialize, Debug)] struct BidAsk { #[serde(rename = "0")] pub price: String, #[serde(rename = "1")] pub qty: String, } #[derive(Serialize, Deserialize, Debug)] struct SonicTrade { #[serde(rename = "time")] pub time: u64, #[serde(rename = "last_price")] pub price: String, #[serde(rename = "last_qty")] pub qty: String, #[serde(rename = "side")] pub side: String, } #[derive(Debug)] enum StreamData { Trade(Vec), Depth(SonicDepth, i64), // Kline(Ticker, Vec), } #[allow(unused_assignments)] fn feed_de( slice: &[u8] ) -> Result, StreamError> { // // 这里应该是做缓存,之前的bybit数据推送是增量的 // let mut depth_wrap: Option = None; let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(slice) }; let mut trade = SonicTrade { time: 0, price: "".to_string(), qty: "".to_string(), side: "".to_string(), }; let mut depth = SonicDepth { update_id: 0, bids: vec![], asks: vec![], }; for elem in iter { let (k, v) = elem.map_err(|e| StreamError::ParseError(e.to_string()))?; // let v = &v.as_raw_faststr(); match k.as_str() { "time" => { let t: u64 = v.as_u64().unwrap(); trade.time = t; depth.update_id = t; } "last_price" => { trade.price = v.to_string(); } "last_qty" => { trade.qty = v.to_string(); } "side" => { trade.side = v.to_string(); } "asks" => { let asks = serde_json::from_slice::(v.as_raw_str().as_bytes()) .unwrap(); for ask in asks.as_array().unwrap() { let order_book = BidAsk { price: ask.as_array().unwrap()[0].to_string(), qty: ask.as_array().unwrap()[1].to_string(), }; depth.asks.push(order_book); } } "bids" => { let bids = serde_json::from_slice::(v.as_raw_str().as_bytes()) .unwrap(); for bid in bids.as_array().unwrap() { let order_book = BidAsk { price: bid.as_array().unwrap()[0].to_string(), qty: bid.as_array().unwrap()[1].to_string(), }; depth.bids.push(order_book); } } &_ => {} } } let update_id = depth.update_id; Ok(vec![StreamData::Trade(vec![trade]), StreamData::Depth(depth, update_id as i64)]) } async fn connect() -> Result>, StreamError> { let url = "ws://localhost:6789"; let addr = "localhost:6789"; let stream = TcpStream::connect(&addr).await .map_err(|e| StreamError::WebsocketError(e.to_string()))?; // 2. 构建 WebSocket 握手请求 let req = Request::builder() .method("GET") .uri(url) .header("Host", "localhost") .header(UPGRADE, "websocket") .header(CONNECTION, "upgrade") .header("Sec-WebSocket-Key", fastwebsockets::handshake::generate_key()) .header("Sec-WebSocket-Version", "13") .header("Sec-WebSocket-Protocol", "rust-websocket") // 可选协议 .header("User-Agent", "rust-client/1.0") // 添加 UA 头 .body(Empty::::new()) .map_err(|e| StreamError::WebsocketError(e.to_string()))?; let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, stream) .await .map_err(|e| StreamError::WebsocketError(e.to_string()))?; Ok(FragmentCollector::new(ws)) } async fn try_connect( streams: &Value, output: &mut futures::channel::mpsc::Sender, ) -> State { match connect().await { Ok(mut websocket) => { if let Err(e) = websocket .write_frame(Frame::text(fastwebsockets::Payload::Borrowed( streams.to_string().as_bytes(), ))) .await { let _ = output .send(Event::Disconnected(format!("Failed subscribing: {e}"))) .await; return State::Disconnected; } let _ = output.send(Event::Connected(Connection)).await; State::Connected(websocket) } Err(err) => { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let _ = output .send(Event::Disconnected(format!("Failed to connect: {err}"))) .await; State::Disconnected } } } pub fn connect_market_stream(ticker: Ticker) -> impl Stream { stream::channel(100, move |mut output| async move { let mut state: State = State::Disconnected; let (symbol_str, market_type) = ticker.get_string(); let stream_1 = format!("publicTrade.{symbol_str}"); let stream_2 = format!( "orderbook.{}.{}", match market_type { MarketType::Spot => "200", MarketType::LinearPerps => "500", }, symbol_str, ); let subscribe_message = json!({ "op": "subscribe", "args": [stream_1, stream_2] }); let mut trades_buffer: Vec = Vec::new(); let mut orderbook = LocalDepthCache::new(); loop { match &mut state { State::Disconnected => { state = try_connect( &subscribe_message, &mut output ).await; } State::Connected(websocket) => match websocket.read_frame().await { Ok(msg) => match msg.opcode { OpCode::Text => { let result = feed_de(&msg.payload[..]); match result { Ok(data_vec) => { let trade_handle_rst = &data_vec[0]; let depth_handle_rst = &data_vec[1]; match trade_handle_rst { StreamData::Trade(de_trade_vec) => { for de_trade in de_trade_vec { let trade = Trade { time: de_trade.time as i64, is_sell: de_trade.side == "sell", price: str_f32_parse(&de_trade.price), qty: str_f32_parse(&de_trade.qty), }; trades_buffer.push(trade); } } StreamData::Depth(_, _) => {} } match depth_handle_rst { StreamData::Depth(de_depth, time) => { let t = time.clone(); let depth_update = VecLocalDepthCache { last_update_id: de_depth.update_id as i64, time: t, bids: de_depth .bids .iter() .map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty), }) .collect(), asks: de_depth .asks .iter() .map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty), }) .collect(), }; orderbook.fetched(&depth_update); } StreamData::Trade(_) => {} } } Err(e) => { // 处理错误 error!("处理数据失败: {}", e); } } } OpCode::Close => { state = State::Disconnected; let _ = output .send(Event::Disconnected("Connection closed".to_string())) .await; } _ => {} }, Err(e) => { state = State::Disconnected; let _ = output .send(Event::Disconnected( "Error reading frame: ".to_string() + &e.to_string(), )) .await; } }, } } }) } // fn string_to_timeframe(interval: &str) -> Option { // Timeframe::ALL // .iter() // .find(|&tf| tf.to_minutes().to_string() == interval) // .copied() // } #[derive(Debug, Clone, Copy, PartialEq, Deserialize)] #[serde(rename_all = "camelCase")] struct DeOpenInterest { #[serde(rename = "openInterest", deserialize_with = "deserialize_string_to_f32")] pub value: f32, #[serde(deserialize_with = "deserialize_string_to_i64")] pub timestamp: i64, } pub async fn fetch_historical_oi( ticker: Ticker, range: Option<(i64, i64)>, period: Timeframe, ) -> Result, StreamError> { let ticker_str = ticker.get_string().0.to_uppercase(); let period_str = match period { Timeframe::M5 => "5min", Timeframe::M15 => "15min", Timeframe::M30 => "30min", Timeframe::H1 => "1h", Timeframe::H2 => "2h", Timeframe::H4 => "4h", _ => { let err_msg = format!("Unsupported timeframe for open interest: {}", period); error!("{}", err_msg); return Err(StreamError::UnknownError(err_msg)); } }; let mut url = format!( "https://api.bybit.com/v5/market/open-interest?category=linear&symbol={}&intervalTime={}", ticker_str, period_str, ); if let Some((start, end)) = range { let interval_ms = period.to_milliseconds() as i64; let num_intervals = ((end - start) / interval_ms).min(200); if num_intervals > 1 { url.push_str(&format!( "&startTime={start}&endTime={end}&limit={num_intervals}" )); } else { url.push_str("&limit=200"); } } else { url.push_str("&limit=200"); } let response = reqwest::get(&url) .await .map_err(|e| { error!("Failed to fetch from {}: {}", url, e); StreamError::FetchError(e) })?; let text = response.text() .await .map_err(|e| { error!("Failed to get response text from {}: {}", url, e); StreamError::FetchError(e) })?; let content: Value = sonic_rs::from_str(&text) .map_err(|e| { error!("Failed to parse JSON from {}: {}\nResponse: {}", url, e, text); StreamError::ParseError(e.to_string()) })?; let result_list = content["result"]["list"] .as_array() .ok_or_else(|| { error!("Result list is not an array in response: {}", text); StreamError::ParseError("Result list is not an array".to_string()) })?; let bybit_oi: Vec = serde_json::from_value(json!(result_list)) .map_err(|e| { error!("Failed to parse open interest array: {}\nResponse: {}", e, text); StreamError::ParseError(format!("Failed to parse open interest: {e}")) })?; let open_interest: Vec = bybit_oi .into_iter() .map(|x| OpenInterest { time: x.timestamp, value: x.value, }) .collect(); if open_interest.is_empty() { warn!("No open interest data found for {}, from url: {}", ticker_str, url); } Ok(open_interest) } #[allow(dead_code)] #[derive(Deserialize, Debug)] struct ApiResponse { #[serde(rename = "retCode")] ret_code: u32, #[serde(rename = "retMsg")] ret_msg: String, result: ApiResult, } #[allow(dead_code)] #[derive(Deserialize, Debug)] struct ApiResult { symbol: String, category: String, list: Vec>, } pub async fn fetch_klines( ticker: Ticker, timeframe: Timeframe, range: Option<(i64, i64)>, ) -> Result, StreamError> { let (symbol_str, market_type) = &ticker.get_string(); let timeframe_str = timeframe.to_minutes().to_string(); fn parse_kline_field(field: Option<&str>) -> Result { field .ok_or_else(|| StreamError::ParseError("Failed to parse kline".to_string())) .and_then(|s| { s.parse::() .map_err(|_| StreamError::ParseError("Failed to parse kline".to_string())) }) } let market = match market_type { MarketType::Spot => "spot", MarketType::LinearPerps => "linear", }; let mut url = format!( "https://api.bybit.com/v5/market/kline?category={}&symbol={}&interval={}", market, symbol_str.to_uppercase(), timeframe_str ); if let Some((start, end)) = range { let interval_ms = timeframe.to_milliseconds() as i64; let num_intervals = ((end - start) / interval_ms).min(1000); url.push_str(&format!("&start={start}&end={end}&limit={num_intervals}")); } else { url.push_str(&format!("&limit={}", 200)); } let response: reqwest::Response = reqwest::get(&url).await.map_err(StreamError::FetchError)?; let text = response.text().await.map_err(StreamError::FetchError)?; let api_response: ApiResponse = sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?; let klines: Result, StreamError> = api_response .result .list .iter() .map(|kline| { let time = parse_kline_field::(kline[0].as_str())?; let open = parse_kline_field::(kline[1].as_str())?; let high = parse_kline_field::(kline[2].as_str())?; let low = parse_kline_field::(kline[3].as_str())?; let close = parse_kline_field::(kline[4].as_str())?; let volume = parse_kline_field::(kline[5].as_str())?; Ok(Kline { time, open, high, low, close, volume: (-1.0, volume), }) }) .collect(); klines } pub async fn fetch_ticksize(market_type: MarketType) -> Result>, StreamError> { let market = match market_type { MarketType::Spot => "spot", MarketType::LinearPerps => "linear", }; let url = format!("https://api.bybit.com/v5/market/instruments-info?category={market}"); let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?; let text = response.text().await.map_err(StreamError::FetchError)?; let exchange_info: Value = sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?; let result_list: &Vec = exchange_info["result"]["list"] .as_array() .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?; let mut ticker_info_map = HashMap::new(); let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap(); for item in result_list { let symbol = item["symbol"] .as_str() .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?; if !re.is_match(symbol) { continue; } let price_filter = item["priceFilter"] .as_object() .ok_or_else(|| StreamError::ParseError("Price filter not found".to_string()))?; let min_ticksize = price_filter["tickSize"] .as_str() .ok_or_else(|| StreamError::ParseError("Tick size not found".to_string()))? .parse::() .map_err(|_| StreamError::ParseError("Failed to parse tick size".to_string()))?; let ticker = Ticker::new(symbol, market_type); ticker_info_map.insert(ticker, Some(TickerInfo { min_ticksize, ticker })); } Ok(ticker_info_map) } pub async fn fetch_ticker_prices(market_type: MarketType) -> Result, StreamError> { let market = match market_type { MarketType::Spot => "spot", MarketType::LinearPerps => "linear", }; let url = format!("https://api.bybit.com/v5/market/tickers?category={market}"); let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?; let text = response.text().await.map_err(StreamError::FetchError)?; let exchange_info: Value = sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?; let result_list: &Vec = exchange_info["result"]["list"] .as_array() .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?; let mut ticker_prices_map = HashMap::new(); let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap(); for item in result_list { let symbol = item["symbol"] .as_str() .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?; if !re.is_match(symbol) { continue; } let mark_price = item["lastPrice"] .as_str() .ok_or_else(|| StreamError::ParseError("Mark price not found".to_string()))? .parse::() .map_err(|_| StreamError::ParseError("Failed to parse mark price".to_string()))?; let daily_price_chg = item["price24hPcnt"] .as_str() .ok_or_else(|| StreamError::ParseError("Daily price change not found".to_string()))? .parse::() .map_err(|_| { StreamError::ParseError("Failed to parse daily price change".to_string()) })?; let daily_volume = item["volume24h"] .as_str() .ok_or_else(|| StreamError::ParseError("Daily volume not found".to_string()))? .parse::() .map_err(|_| StreamError::ParseError("Failed to parse daily volume".to_string()))?; let quote_volume = daily_volume * mark_price; if quote_volume < 4_000_000.0 { continue; } let ticker_stats = TickerStats { mark_price, daily_price_chg: daily_price_chg * 100.0, daily_volume: quote_volume, }; ticker_prices_map.insert(Ticker::new(symbol, market_type), ticker_stats); } Ok(ticker_prices_map) }