| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637 |
- use tracing::warn;
- use crate::data_providers::{deserialize_string_to_f32, SpawnExecutor};
- use crate::data_providers::deserialize_string_to_i64;
- use std::collections::HashMap;
- use std::vec;
- use bytes::Bytes;
- use iced::{
- stream,
- futures::{sink::SinkExt, Stream},
- };
- use regex::Regex;
- use serde_json::json;
- use serde_json::Value;
- use sonic_rs::{Deserialize, JsonValueTrait, Serialize};
- use sonic_rs::to_object_iter_unchecked;
- use fastwebsockets::{Frame, FragmentCollector, OpCode};
- use http_body_util::Empty;
- use hyper::header::{CONNECTION, UPGRADE};
- use hyper::Request;
- use hyper::upgrade::Upgraded;
- use hyper_util::rt::TokioIo;
- use tokio::net::TcpStream;
- use crate::data_providers::{
- Connection, Event, Kline, LocalDepthCache, MarketType, Order, State,
- StreamError, TickerInfo, TickerStats, Trade, VecLocalDepthCache,
- };
- use crate::{Ticker, Timeframe};
- use super::str_f32_parse;
- use super::OpenInterest;
- use tracing::error;
- #[derive(Serialize, Deserialize, Debug)]
- struct SonicDepth {
- #[serde(rename = "time")]
- pub update_id: u64,
- #[serde(rename = "bids")]
- pub bids: Vec<BidAsk>,
- #[serde(rename = "asks")]
- pub asks: Vec<BidAsk>,
- }
- #[derive(Serialize, Deserialize, Debug)]
- struct BidAsk {
- #[serde(rename = "0")]
- pub price: String,
- #[serde(rename = "1")]
- pub qty: String,
- }
- #[derive(Serialize, Deserialize, Debug)]
- struct SonicTrade {
- #[serde(rename = "time")]
- pub time: u64,
- #[serde(rename = "last_price")]
- pub price: String,
- #[serde(rename = "last_qty")]
- pub qty: String,
- #[serde(rename = "side")]
- pub side: String,
- }
- #[derive(Debug)]
- enum StreamData {
- Trade(Vec<SonicTrade>),
- Depth(SonicDepth, i64),
- // Kline(Ticker, Vec<SonicKline>),
- }
- #[allow(unused_assignments)]
- fn feed_de(
- slice: &[u8]
- ) -> Result<Vec<StreamData>, StreamError> {
- // // 这里应该是做缓存,之前的bybit数据推送是增量的
- // let mut depth_wrap: Option<SonicDepth> = None;
- let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(slice) };
- let mut trade = SonicTrade {
- time: 0,
- price: "".to_string(),
- qty: "".to_string(),
- side: "".to_string(),
- };
- let mut depth = SonicDepth {
- update_id: 0,
- bids: vec![],
- asks: vec![],
- };
- for elem in iter {
- let (k, v) = elem.map_err(|e| StreamError::ParseError(e.to_string()))?;
- // let v = &v.as_raw_faststr();
- match k.as_str() {
- "time" => {
- let t: u64 = v.as_u64().unwrap();
- trade.time = t;
- depth.update_id = t;
- }
- "last_price" => {
- trade.price = v.to_string();
- }
- "last_qty" => {
- trade.qty = v.to_string();
- }
- "side" => {
- trade.side = v.to_string();
- }
- "asks" => {
- let asks = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
- .unwrap();
- for ask in asks.as_array().unwrap() {
- let order_book = BidAsk {
- price: ask.as_array().unwrap()[0].to_string(),
- qty: ask.as_array().unwrap()[1].to_string(),
- };
- depth.asks.push(order_book);
- }
- }
- "bids" => {
- let bids = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
- .unwrap();
- for bid in bids.as_array().unwrap() {
- let order_book = BidAsk {
- price: bid.as_array().unwrap()[0].to_string(),
- qty: bid.as_array().unwrap()[1].to_string(),
- };
- depth.bids.push(order_book);
- }
- }
- &_ => {}
- }
- }
- let update_id = depth.update_id;
- Ok(vec![StreamData::Trade(vec![trade]), StreamData::Depth(depth, update_id as i64)])
- }
- async fn connect() -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
- let url = "ws://localhost:6789";
- let addr = "localhost:6789";
- let stream = TcpStream::connect(&addr).await
- .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
- // 2. 构建 WebSocket 握手请求
- let req = Request::builder()
- .method("GET")
- .uri(url)
- .header("Host", "localhost")
- .header(UPGRADE, "websocket")
- .header(CONNECTION, "upgrade")
- .header("Sec-WebSocket-Key", fastwebsockets::handshake::generate_key())
- .header("Sec-WebSocket-Version", "13")
- .header("Sec-WebSocket-Protocol", "rust-websocket") // 可选协议
- .header("User-Agent", "rust-client/1.0") // 添加 UA 头
- .body(Empty::<Bytes>::new())
- .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
- let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
- .await
- .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
- Ok(FragmentCollector::new(ws))
- }
- async fn try_connect(
- streams: &Value,
- output: &mut futures::channel::mpsc::Sender<Event>,
- ) -> State {
- match connect().await {
- Ok(mut websocket) => {
- if let Err(e) = websocket
- .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
- streams.to_string().as_bytes(),
- )))
- .await
- {
- let _ = output
- .send(Event::Disconnected(format!("Failed subscribing: {e}")))
- .await;
- return State::Disconnected;
- }
- let _ = output.send(Event::Connected(Connection)).await;
- State::Connected(websocket)
- }
- Err(err) => {
- tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
- let _ = output
- .send(Event::Disconnected(format!("Failed to connect: {err}")))
- .await;
- State::Disconnected
- }
- }
- }
- pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
- stream::channel(100, move |mut output| async move {
- let mut state: State = State::Disconnected;
- let (symbol_str, market_type) = ticker.get_string();
- let stream_1 = format!("publicTrade.{symbol_str}");
- let stream_2 = format!(
- "orderbook.{}.{}",
- match market_type {
- MarketType::Spot => "200",
- MarketType::LinearPerps => "500",
- },
- symbol_str,
- );
- let subscribe_message = json!({
- "op": "subscribe",
- "args": [stream_1, stream_2]
- });
- let mut trades_buffer: Vec<Trade> = Vec::new();
- let mut orderbook = LocalDepthCache::new();
- loop {
- match &mut state {
- State::Disconnected => {
- state = try_connect(
- &subscribe_message,
- &mut output
- ).await;
- }
- State::Connected(websocket) => match websocket.read_frame().await {
- Ok(msg) => match msg.opcode {
- OpCode::Text => {
- let result = feed_de(&msg.payload[..]);
- match result {
- Ok(data_vec) => {
- let trade_handle_rst = &data_vec[0];
- let depth_handle_rst = &data_vec[1];
- match trade_handle_rst {
- StreamData::Trade(de_trade_vec) => {
- for de_trade in de_trade_vec {
- let trade = Trade {
- time: de_trade.time as i64,
- is_sell: de_trade.side == "sell",
- price: str_f32_parse(&de_trade.price),
- qty: str_f32_parse(&de_trade.qty),
- };
- trades_buffer.push(trade);
- }
- }
- StreamData::Depth(_, _) => {}
- }
- match depth_handle_rst {
- StreamData::Depth(de_depth, time) => {
- let t = time.clone();
- let depth_update = VecLocalDepthCache {
- last_update_id: de_depth.update_id as i64,
- time: t,
- bids: de_depth
- .bids
- .iter()
- .map(|x| Order {
- price: str_f32_parse(&x.price),
- qty: str_f32_parse(&x.qty),
- })
- .collect(),
- asks: de_depth
- .asks
- .iter()
- .map(|x| Order {
- price: str_f32_parse(&x.price),
- qty: str_f32_parse(&x.qty),
- })
- .collect(),
- };
- orderbook.fetched(&depth_update);
- let _ = output
- .send(Event::DepthReceived(
- ticker,
- t,
- orderbook.get_depth(),
- std::mem::take(&mut trades_buffer).into_boxed_slice(),
- ))
- .await;
- }
- StreamData::Trade(_) => {}
- }
- }
- Err(e) => {
- // 处理错误
- error!("处理数据失败: {}", e);
- }
- }
- }
- OpCode::Close => {
- state = State::Disconnected;
- let _ = output
- .send(Event::Disconnected("Connection closed".to_string()))
- .await;
- }
- _ => {}
- },
- Err(e) => {
- state = State::Disconnected;
- let _ = output
- .send(Event::Disconnected(
- "Error reading frame: ".to_string() + &e.to_string(),
- ))
- .await;
- }
- },
- }
- }
- })
- }
- // fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
- // Timeframe::ALL
- // .iter()
- // .find(|&tf| tf.to_minutes().to_string() == interval)
- // .copied()
- // }
- #[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
- #[serde(rename_all = "camelCase")]
- struct DeOpenInterest {
- #[serde(rename = "openInterest", deserialize_with = "deserialize_string_to_f32")]
- pub value: f32,
- #[serde(deserialize_with = "deserialize_string_to_i64")]
- pub timestamp: i64,
- }
- pub async fn fetch_historical_oi(
- ticker: Ticker,
- range: Option<(i64, i64)>,
- period: Timeframe,
- ) -> Result<Vec<OpenInterest>, StreamError> {
- let ticker_str = ticker.get_string().0.to_uppercase();
- let period_str = match period {
- Timeframe::M5 => "5min",
- Timeframe::M15 => "15min",
- Timeframe::M30 => "30min",
- Timeframe::H1 => "1h",
- Timeframe::H2 => "2h",
- Timeframe::H4 => "4h",
- _ => {
- let err_msg = format!("Unsupported timeframe for open interest: {}", period);
- error!("{}", err_msg);
- return Err(StreamError::UnknownError(err_msg));
- }
- };
- let mut url = format!(
- "https://api.bybit.com/v5/market/open-interest?category=linear&symbol={}&intervalTime={}",
- ticker_str, period_str,
- );
- if let Some((start, end)) = range {
- let interval_ms = period.to_milliseconds() as i64;
- let num_intervals = ((end - start) / interval_ms).min(200);
- if num_intervals > 1 {
- url.push_str(&format!(
- "&startTime={start}&endTime={end}&limit={num_intervals}"
- ));
- } else {
- url.push_str("&limit=200");
- }
- } else {
- url.push_str("&limit=200");
- }
- let response = reqwest::get(&url)
- .await
- .map_err(|e| {
- error!("Failed to fetch from {}: {}", url, e);
- StreamError::FetchError(e)
- })?;
-
- let text = response.text()
- .await
- .map_err(|e| {
- error!("Failed to get response text from {}: {}", url, e);
- StreamError::FetchError(e)
- })?;
- let content: Value = sonic_rs::from_str(&text)
- .map_err(|e| {
- error!("Failed to parse JSON from {}: {}\nResponse: {}", url, e, text);
- StreamError::ParseError(e.to_string())
- })?;
- let result_list = content["result"]["list"]
- .as_array()
- .ok_or_else(|| {
- error!("Result list is not an array in response: {}", text);
- StreamError::ParseError("Result list is not an array".to_string())
- })?;
-
- let bybit_oi: Vec<DeOpenInterest> = serde_json::from_value(json!(result_list))
- .map_err(|e| {
- error!("Failed to parse open interest array: {}\nResponse: {}", e, text);
- StreamError::ParseError(format!("Failed to parse open interest: {e}"))
- })?;
- let open_interest: Vec<OpenInterest> = bybit_oi
- .into_iter()
- .map(|x| OpenInterest {
- time: x.timestamp,
- value: x.value,
- })
- .collect();
- if open_interest.is_empty() {
- warn!("No open interest data found for {}, from url: {}", ticker_str, url);
- }
- Ok(open_interest)
- }
- #[allow(dead_code)]
- #[derive(Deserialize, Debug)]
- struct ApiResponse {
- #[serde(rename = "retCode")]
- ret_code: u32,
- #[serde(rename = "retMsg")]
- ret_msg: String,
- result: ApiResult,
- }
- #[allow(dead_code)]
- #[derive(Deserialize, Debug)]
- struct ApiResult {
- symbol: String,
- category: String,
- list: Vec<Vec<Value>>,
- }
- pub async fn fetch_klines(
- ticker: Ticker,
- timeframe: Timeframe,
- range: Option<(i64, i64)>,
- ) -> Result<Vec<Kline>, StreamError> {
- let (symbol_str, market_type) = &ticker.get_string();
- let timeframe_str = timeframe.to_minutes().to_string();
- fn parse_kline_field<T: std::str::FromStr>(field: Option<&str>) -> Result<T, StreamError> {
- field
- .ok_or_else(|| StreamError::ParseError("Failed to parse kline".to_string()))
- .and_then(|s| {
- s.parse::<T>()
- .map_err(|_| StreamError::ParseError("Failed to parse kline".to_string()))
- })
- }
- let market = match market_type {
- MarketType::Spot => "spot",
- MarketType::LinearPerps => "linear",
- };
- let mut url = format!(
- "https://api.bybit.com/v5/market/kline?category={}&symbol={}&interval={}",
- market, symbol_str.to_uppercase(), timeframe_str
- );
- if let Some((start, end)) = range {
- let interval_ms = timeframe.to_milliseconds() as i64;
- let num_intervals = ((end - start) / interval_ms).min(1000);
- url.push_str(&format!("&start={start}&end={end}&limit={num_intervals}"));
- } else {
- url.push_str(&format!("&limit={}", 200));
- }
- let response: reqwest::Response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
- let text = response.text().await.map_err(StreamError::FetchError)?;
- let api_response: ApiResponse =
- sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
- let klines: Result<Vec<Kline>, StreamError> = api_response
- .result
- .list
- .iter()
- .map(|kline| {
- let time = parse_kline_field::<u64>(kline[0].as_str())?;
- let open = parse_kline_field::<f32>(kline[1].as_str())?;
- let high = parse_kline_field::<f32>(kline[2].as_str())?;
- let low = parse_kline_field::<f32>(kline[3].as_str())?;
- let close = parse_kline_field::<f32>(kline[4].as_str())?;
- let volume = parse_kline_field::<f32>(kline[5].as_str())?;
- Ok(Kline {
- time,
- open,
- high,
- low,
- close,
- volume: (-1.0, volume),
- })
- })
- .collect();
- klines
- }
- pub async fn fetch_ticksize(market_type: MarketType) -> Result<HashMap<Ticker, Option<TickerInfo>>, StreamError> {
- let market = match market_type {
- MarketType::Spot => "spot",
- MarketType::LinearPerps => "linear",
- };
- let url = format!("https://api.bybit.com/v5/market/instruments-info?category={market}");
- let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
- let text = response.text().await.map_err(StreamError::FetchError)?;
- let exchange_info: Value =
- sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
- let result_list: &Vec<Value> = exchange_info["result"]["list"]
- .as_array()
- .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
- let mut ticker_info_map = HashMap::new();
- let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
- for item in result_list {
- let symbol = item["symbol"]
- .as_str()
- .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
- if !re.is_match(symbol) {
- continue;
- }
- let price_filter = item["priceFilter"]
- .as_object()
- .ok_or_else(|| StreamError::ParseError("Price filter not found".to_string()))?;
- let min_ticksize = price_filter["tickSize"]
- .as_str()
- .ok_or_else(|| StreamError::ParseError("Tick size not found".to_string()))?
- .parse::<f32>()
- .map_err(|_| StreamError::ParseError("Failed to parse tick size".to_string()))?;
- let ticker = Ticker::new(symbol, market_type);
- ticker_info_map.insert(ticker, Some(TickerInfo { min_ticksize, ticker }));
- }
- Ok(ticker_info_map)
- }
- pub async fn fetch_ticker_prices(market_type: MarketType) -> Result<HashMap<Ticker, TickerStats>, StreamError> {
- let market = match market_type {
- MarketType::Spot => "spot",
- MarketType::LinearPerps => "linear",
- };
- let url = format!("https://api.bybit.com/v5/market/tickers?category={market}");
- let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
- let text = response.text().await.map_err(StreamError::FetchError)?;
- let exchange_info: Value =
- sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
- let result_list: &Vec<Value> = exchange_info["result"]["list"]
- .as_array()
- .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
- let mut ticker_prices_map = HashMap::new();
- let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
- for item in result_list {
- let symbol = item["symbol"]
- .as_str()
- .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
- if !re.is_match(symbol) {
- continue;
- }
- let mark_price = item["lastPrice"]
- .as_str()
- .ok_or_else(|| StreamError::ParseError("Mark price not found".to_string()))?
- .parse::<f32>()
- .map_err(|_| StreamError::ParseError("Failed to parse mark price".to_string()))?;
- let daily_price_chg = item["price24hPcnt"]
- .as_str()
- .ok_or_else(|| StreamError::ParseError("Daily price change not found".to_string()))?
- .parse::<f32>()
- .map_err(|_| {
- StreamError::ParseError("Failed to parse daily price change".to_string())
- })?;
- let daily_volume = item["volume24h"]
- .as_str()
- .ok_or_else(|| StreamError::ParseError("Daily volume not found".to_string()))?
- .parse::<f32>()
- .map_err(|_| StreamError::ParseError("Failed to parse daily volume".to_string()))?;
- let quote_volume = daily_volume * mark_price;
- if quote_volume < 4_000_000.0 {
- continue;
- }
- let ticker_stats = TickerStats {
- mark_price,
- daily_price_chg: daily_price_chg * 100.0,
- daily_volume: quote_volume,
- };
- ticker_prices_map.insert(Ticker::new(symbol, market_type), ticker_stats);
- }
- Ok(ticker_prices_map)
- }
|