bybit.rs 27 KB


  1. use crate::data_providers::deserialize_string_to_f32;
  2. use crate::data_providers::deserialize_string_to_i64;
  3. use std::collections::HashMap;
  4. use iced::{
  5. stream,
  6. futures::{sink::SinkExt, Stream},
  7. };
  8. use regex::Regex;
  9. use serde_json::json;
  10. use serde_json::Value;
  11. use sonic_rs::{JsonValueTrait, Deserialize, Serialize};
  12. use sonic_rs::to_object_iter_unchecked;
  13. use fastwebsockets::{Frame, FragmentCollector, OpCode};
  14. use hyper::upgrade::Upgraded;
  15. use hyper_util::rt::TokioIo;
  16. use crate::data_providers::{
  17. setup_tcp_connection, setup_tls_connection, setup_websocket_connection,
  18. Connection, Event, Kline, LocalDepthCache, MarketType, Order, State,
  19. StreamError, TickerInfo, TickerStats, Trade, VecLocalDepthCache,
  20. };
  21. use crate::{Ticker, Timeframe};
  22. use super::OpenInterest;
  23. #[derive(Serialize, Deserialize, Debug)]
  24. struct SonicDepth {
  25. #[serde(rename = "u")]
  26. pub update_id: u64,
  27. #[serde(rename = "b")]
  28. pub bids: Vec<BidAsk>,
  29. #[serde(rename = "a")]
  30. pub asks: Vec<BidAsk>,
  31. }
  32. #[derive(Serialize, Deserialize, Debug)]
  33. struct BidAsk {
  34. #[serde(rename = "0")]
  35. pub price: String,
  36. #[serde(rename = "1")]
  37. pub qty: String,
  38. }
  39. #[derive(Serialize, Deserialize, Debug)]
  40. struct SonicTrade {
  41. #[serde(rename = "T")]
  42. pub time: u64,
  43. #[serde(rename = "p")]
  44. pub price: String,
  45. #[serde(rename = "v")]
  46. pub qty: String,
  47. #[serde(rename = "S")]
  48. pub is_sell: String,
  49. }
  50. #[derive(Deserialize, Debug, Clone)]
  51. pub struct SonicKline {
  52. #[serde(rename = "start")]
  53. pub time: u64,
  54. #[serde(rename = "open")]
  55. pub open: String,
  56. #[serde(rename = "high")]
  57. pub high: String,
  58. #[serde(rename = "low")]
  59. pub low: String,
  60. #[serde(rename = "close")]
  61. pub close: String,
  62. #[serde(rename = "volume")]
  63. pub volume: String,
  64. #[serde(rename = "interval")]
  65. pub interval: String,
  66. }
  67. #[derive(Debug)]
  68. enum StreamData {
  69. Trade(Vec<SonicTrade>),
  70. Depth(SonicDepth, String, i64),
  71. Kline(Ticker, Vec<SonicKline>),
  72. }
  73. #[derive(Debug)]
  74. enum StreamName {
  75. Depth(Ticker),
  76. Trade(Ticker),
  77. Kline(Ticker),
  78. Unknown,
  79. }
  80. impl StreamName {
  81. fn from_topic(topic: &str, is_ticker: Option<Ticker>, market_type: MarketType) -> Self {
  82. let parts: Vec<&str> = topic.split('.').collect();
  83. if let Some(ticker_str) = parts.last() {
  84. let ticker = is_ticker.unwrap_or_else(|| Ticker::new(ticker_str, market_type));
  85. match parts.first() {
  86. Some(&"publicTrade") => StreamName::Trade(ticker),
  87. Some(&"orderbook") => StreamName::Depth(ticker),
  88. Some(&"kline") => StreamName::Kline(ticker),
  89. _ => StreamName::Unknown,
  90. }
  91. } else {
  92. StreamName::Unknown
  93. }
  94. }
  95. }
  96. #[derive(Debug)]
  97. enum StreamWrapper {
  98. Trade,
  99. Depth,
  100. Kline,
  101. }
  102. #[allow(unused_assignments)]
  103. fn feed_de(
  104. slice: &[u8],
  105. ticker: Option<Ticker>,
  106. market_type: MarketType
  107. ) -> Result<StreamData, StreamError> {
  108. let mut stream_type: Option<StreamWrapper> = None;
  109. let mut depth_wrap: Option<SonicDepth> = None;
  110. let mut data_type = String::new();
  111. let mut topic_ticker = Ticker::default();
  112. let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(slice) };
  113. for elem in iter {
  114. let (k, v) = elem.map_err(|e| StreamError::ParseError(e.to_string()))?;
  115. if k == "topic" {
  116. if let Some(val) = v.as_str() {
  117. let mut is_ticker = None;
  118. if let Some(ticker) = ticker {
  119. is_ticker = Some(ticker);
  120. }
  121. match StreamName::from_topic(val, is_ticker, market_type) {
  122. StreamName::Depth(ticker) => {
  123. stream_type = Some(StreamWrapper::Depth);
  124. topic_ticker = ticker;
  125. }
  126. StreamName::Trade(ticker) => {
  127. stream_type = Some(StreamWrapper::Trade);
  128. topic_ticker = ticker;
  129. }
  130. StreamName::Kline(ticker) => {
  131. stream_type = Some(StreamWrapper::Kline);
  132. topic_ticker = ticker;
  133. }
  134. _ => {
  135. log::error!("Unknown stream name");
  136. }
  137. }
  138. }
  139. } else if k == "type" {
  140. v.as_str().unwrap().clone_into(&mut data_type);
  141. } else if k == "data" {
  142. match stream_type {
  143. Some(StreamWrapper::Trade) => {
  144. let trade_wrap: Vec<SonicTrade> = sonic_rs::from_str(&v.as_raw_faststr())
  145. .map_err(|e| StreamError::ParseError(e.to_string()))?;
  146. return Ok(StreamData::Trade(trade_wrap));
  147. }
  148. Some(StreamWrapper::Depth) => {
  149. if depth_wrap.is_none() {
  150. depth_wrap = Some(SonicDepth {
  151. update_id: 0,
  152. bids: Vec::new(),
  153. asks: Vec::new(),
  154. });
  155. }
  156. depth_wrap = Some(
  157. sonic_rs::from_str(&v.as_raw_faststr())
  158. .map_err(|e| StreamError::ParseError(e.to_string()))?,
  159. );
  160. }
  161. Some(StreamWrapper::Kline) => {
  162. let kline_wrap: Vec<SonicKline> = sonic_rs::from_str(&v.as_raw_faststr())
  163. .map_err(|e| StreamError::ParseError(e.to_string()))?;
  164. return Ok(StreamData::Kline(topic_ticker, kline_wrap));
  165. }
  166. _ => {
  167. log::error!("Unknown stream type");
  168. }
  169. }
  170. } else if k == "cts" {
  171. if let Some(dw) = depth_wrap {
  172. let time: u64 = v
  173. .as_u64()
  174. .ok_or_else(|| StreamError::ParseError("Failed to parse u64".to_string()))?;
  175. return Ok(StreamData::Depth(dw, data_type.to_string(), time as i64));
  176. }
  177. }
  178. }
  179. Err(StreamError::UnknownError("Unknown data".to_string()))
  180. }
  181. async fn connect(domain: &str, market_type: MarketType) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
  182. let tcp_stream = setup_tcp_connection(domain).await?;
  183. let tls_stream = setup_tls_connection(domain, tcp_stream).await?;
  184. let url = format!(
  185. "wss://stream.bybit.com/v5/public/{}",
  186. match market_type {
  187. MarketType::Spot => "spot",
  188. MarketType::LinearPerps => "linear",
  189. }
  190. );
  191. setup_websocket_connection(domain, tls_stream, &url).await
  192. }
  193. fn str_f32_parse(s: &str) -> f32 {
  194. s.parse::<f32>().unwrap_or_else(|e| {
  195. log::error!("Failed to parse float: {}, error: {}", s, e);
  196. 0.0
  197. })
  198. }
  199. fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
  200. Timeframe::ALL
  201. .iter()
  202. .find(|&tf| tf.to_minutes().to_string() == interval)
  203. .copied()
  204. }
  205. pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
  206. stream::channel(100, move |mut output| async move {
  207. let mut state: State = State::Disconnected;
  208. let mut trades_buffer: Vec<Trade> = Vec::new();
  209. let (symbol_str, market_type) = ticker.get_string();
  210. let stream_1 = format!("publicTrade.{symbol_str}");
  211. let stream_2 = format!(
  212. "orderbook.{}.{}",
  213. match market_type {
  214. MarketType::Spot => "200",
  215. MarketType::LinearPerps => "500",
  216. },
  217. symbol_str,
  218. );
  219. let mut orderbook: LocalDepthCache = LocalDepthCache::new();
  220. loop {
  221. match &mut state {
  222. State::Disconnected => {
  223. let domain: &str = "stream.bybit.com";
  224. if let Ok(mut websocket) = connect(domain, market_type).await {
  225. let subscribe_message: String = serde_json::json!({
  226. "op": "subscribe",
  227. "args": [stream_1, stream_2]
  228. })
  229. .to_string();
  230. if let Err(e) = websocket
  231. .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
  232. subscribe_message.as_bytes(),
  233. )))
  234. .await
  235. {
  236. let _ = output
  237. .send(Event::Disconnected(format!("Failed subscribing: {e}")))
  238. .await;
  239. continue;
  240. }
  241. state = State::Connected(websocket);
  242. let _ = output.send(Event::Connected(Connection)).await;
  243. } else {
  244. tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  245. let _ = output
  246. .send(Event::Disconnected(
  247. "Failed to connect to websocket".to_string(),
  248. ))
  249. .await;
  250. }
  251. }
  252. State::Connected(websocket) => match websocket.read_frame().await {
  253. Ok(msg) => match msg.opcode {
  254. OpCode::Text => {
  255. if let Ok(data) = feed_de(&msg.payload[..], Some(ticker), market_type) {
  256. match data {
  257. StreamData::Trade(de_trade_vec) => {
  258. for de_trade in &de_trade_vec {
  259. let trade = Trade {
  260. time: de_trade.time as i64,
  261. is_sell: de_trade.is_sell == "Sell",
  262. price: str_f32_parse(&de_trade.price),
  263. qty: str_f32_parse(&de_trade.qty),
  264. };
  265. trades_buffer.push(trade);
  266. }
  267. }
  268. StreamData::Depth(de_depth, data_type, time) => {
  269. let depth_update = VecLocalDepthCache {
  270. last_update_id: de_depth.update_id as i64,
  271. time,
  272. bids: de_depth
  273. .bids
  274. .iter()
  275. .map(|x| Order {
  276. price: str_f32_parse(&x.price),
  277. qty: str_f32_parse(&x.qty),
  278. })
  279. .collect(),
  280. asks: de_depth
  281. .asks
  282. .iter()
  283. .map(|x| Order {
  284. price: str_f32_parse(&x.price),
  285. qty: str_f32_parse(&x.qty),
  286. })
  287. .collect(),
  288. };
  289. if (data_type == "snapshot")
  290. || (depth_update.last_update_id == 1)
  291. {
  292. orderbook.fetched(&depth_update);
  293. } else if data_type == "delta" {
  294. orderbook.update_depth_cache(&depth_update);
  295. let _ = output
  296. .send(Event::DepthReceived(
  297. ticker,
  298. time,
  299. orderbook.get_depth(),
  300. std::mem::take(&mut trades_buffer),
  301. ))
  302. .await;
  303. }
  304. }
  305. _ => {
  306. log::warn!("Unknown data: {:?}", &data);
  307. }
  308. }
  309. }
  310. }
  311. OpCode::Close => {
  312. state = State::Disconnected;
  313. let _ = output
  314. .send(Event::Disconnected("Connection closed".to_string()))
  315. .await;
  316. }
  317. _ => {}
  318. },
  319. Err(e) => {
  320. state = State::Disconnected;
  321. let _ = output
  322. .send(Event::Disconnected(
  323. "Error reading frame: ".to_string() + &e.to_string(),
  324. ))
  325. .await;
  326. }
  327. },
  328. }
  329. }
  330. })
  331. }
  332. pub fn connect_kline_stream(
  333. streams: Vec<(Ticker, Timeframe)>,
  334. market_type: MarketType
  335. ) -> impl Stream<Item = Event> {
  336. stream::channel(100, move |mut output| async move {
  337. let mut state = State::Disconnected;
  338. let stream_str = streams
  339. .iter()
  340. .map(|(ticker, timeframe)| {
  341. let timeframe_str = timeframe.to_minutes().to_string();
  342. format!("kline.{timeframe_str}.{}", ticker.get_string().0)
  343. })
  344. .collect::<Vec<String>>();
  345. loop {
  346. match &mut state {
  347. State::Disconnected => {
  348. let domain = "stream.bybit.com";
  349. if let Ok(mut websocket) = connect(domain, market_type).await {
  350. let subscribe_message = serde_json::json!({
  351. "op": "subscribe",
  352. "args": stream_str
  353. })
  354. .to_string();
  355. if let Err(e) = websocket
  356. .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
  357. subscribe_message.as_bytes(),
  358. )))
  359. .await
  360. {
  361. let _ = output
  362. .send(Event::Disconnected(format!("Failed subscribing: {e}")))
  363. .await;
  364. continue;
  365. }
  366. state = State::Connected(websocket);
  367. let _ = output.send(Event::Connected(Connection)).await;
  368. } else {
  369. tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  370. let _ = output
  371. .send(Event::Disconnected(
  372. "Failed to connect to websocket".to_string(),
  373. ))
  374. .await;
  375. }
  376. }
  377. State::Connected(websocket) => match websocket.read_frame().await {
  378. Ok(msg) => {
  379. if msg.opcode == OpCode::Text {
  380. if let Ok(StreamData::Kline(ticker, de_kline_vec)) =
  381. feed_de(&msg.payload[..], None, market_type)
  382. {
  383. for de_kline in &de_kline_vec {
  384. let kline = Kline {
  385. time: de_kline.time,
  386. open: str_f32_parse(&de_kline.open),
  387. high: str_f32_parse(&de_kline.high),
  388. low: str_f32_parse(&de_kline.low),
  389. close: str_f32_parse(&de_kline.close),
  390. volume: (-1.0, str_f32_parse(&de_kline.volume)),
  391. };
  392. if let Some(timeframe) = string_to_timeframe(&de_kline.interval)
  393. {
  394. let _ = output
  395. .send(Event::KlineReceived(ticker, kline, timeframe))
  396. .await;
  397. } else {
  398. log::error!(
  399. "Failed to find timeframe: {}, {:?}",
  400. &de_kline.interval,
  401. streams
  402. );
  403. }
  404. }
  405. }
  406. }
  407. }
  408. Err(e) => {
  409. state = State::Disconnected;
  410. let _ = output
  411. .send(Event::Disconnected(
  412. "Error reading frame: ".to_string() + &e.to_string(),
  413. ))
  414. .await;
  415. }
  416. },
  417. }
  418. }
  419. })
  420. }
  421. #[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
  422. #[serde(rename_all = "camelCase")]
  423. struct DeOpenInterest {
  424. #[serde(rename = "openInterest", deserialize_with = "deserialize_string_to_f32")]
  425. pub value: f32,
  426. #[serde(deserialize_with = "deserialize_string_to_i64")]
  427. pub timestamp: i64,
  428. }
  429. pub async fn fetch_historical_oi(
  430. ticker: Ticker,
  431. range: Option<(i64, i64)>,
  432. period: Timeframe,
  433. ) -> Result<Vec<OpenInterest>, StreamError> {
  434. let ticker_str = ticker.get_string().0.to_uppercase();
  435. let period_str = match period {
  436. Timeframe::M5 => "5min",
  437. Timeframe::M15 => "15min",
  438. Timeframe::M30 => "30min",
  439. Timeframe::H1 => "1h",
  440. Timeframe::H4 => "4h",
  441. _ => {
  442. let err_msg = format!("Unsupported timeframe for open interest: {}", period);
  443. log::error!("{}", err_msg);
  444. return Err(StreamError::UnknownError(err_msg));
  445. }
  446. };
  447. let mut url = format!(
  448. "https://api.bybit.com/v5/market/open-interest?category=linear&symbol={}&intervalTime={}",
  449. ticker_str, period_str,
  450. );
  451. if let Some((start, end)) = range {
  452. let interval_ms = period.to_milliseconds() as i64;
  453. let num_intervals = ((end - start) / interval_ms).min(200);
  454. url.push_str(&format!("&startTime={start}&endTime={end}&limit={num_intervals}"));
  455. } else {
  456. url.push_str(&format!("&limit={}", 200));
  457. }
  458. let response = reqwest::get(&url)
  459. .await
  460. .map_err(|e| {
  461. log::error!("Failed to fetch from {}: {}", url, e);
  462. StreamError::FetchError(e)
  463. })?;
  464. let text = response.text()
  465. .await
  466. .map_err(|e| {
  467. log::error!("Failed to get response text from {}: {}", url, e);
  468. StreamError::FetchError(e)
  469. })?;
  470. let content: Value = sonic_rs::from_str(&text)
  471. .map_err(|e| {
  472. log::error!("Failed to parse JSON from {}: {}\nResponse: {}", url, e, text);
  473. StreamError::ParseError(e.to_string())
  474. })?;
  475. let result_list = content["result"]["list"]
  476. .as_array()
  477. .ok_or_else(|| {
  478. log::error!("Result list is not an array in response: {}", text);
  479. StreamError::ParseError("Result list is not an array".to_string())
  480. })?;
  481. let bybit_oi: Vec<DeOpenInterest> = serde_json::from_value(json!(result_list))
  482. .map_err(|e| {
  483. log::error!("Failed to parse open interest array: {}\nResponse: {}", e, text);
  484. StreamError::ParseError(format!("Failed to parse open interest: {e}"))
  485. })?;
  486. let open_interest = bybit_oi
  487. .into_iter()
  488. .map(|x| OpenInterest {
  489. time: x.timestamp,
  490. value: x.value,
  491. })
  492. .collect();
  493. Ok(open_interest)
  494. }
  495. #[allow(dead_code)]
  496. #[derive(Deserialize, Debug)]
  497. struct ApiResponse {
  498. #[serde(rename = "retCode")]
  499. ret_code: u32,
  500. #[serde(rename = "retMsg")]
  501. ret_msg: String,
  502. result: ApiResult,
  503. }
  504. #[allow(dead_code)]
  505. #[derive(Deserialize, Debug)]
  506. struct ApiResult {
  507. symbol: String,
  508. category: String,
  509. list: Vec<Vec<Value>>,
  510. }
  511. pub async fn fetch_klines(
  512. ticker: Ticker,
  513. timeframe: Timeframe,
  514. range: Option<(i64, i64)>,
  515. ) -> Result<Vec<Kline>, StreamError> {
  516. let (symbol_str, market_type) = &ticker.get_string();
  517. let timeframe_str = timeframe.to_minutes().to_string();
  518. fn parse_kline_field<T: std::str::FromStr>(field: Option<&str>) -> Result<T, StreamError> {
  519. field
  520. .ok_or_else(|| StreamError::ParseError("Failed to parse kline".to_string()))
  521. .and_then(|s| {
  522. s.parse::<T>()
  523. .map_err(|_| StreamError::ParseError("Failed to parse kline".to_string()))
  524. })
  525. }
  526. let market = match market_type {
  527. MarketType::Spot => "spot",
  528. MarketType::LinearPerps => "linear",
  529. };
  530. let mut url = format!(
  531. "https://api.bybit.com/v5/market/kline?category={}&symbol={}&interval={}",
  532. market, symbol_str.to_uppercase(), timeframe_str
  533. );
  534. if let Some((start, end)) = range {
  535. let interval_ms = timeframe.to_milliseconds() as i64;
  536. let num_intervals = ((end - start) / interval_ms).min(1000);
  537. url.push_str(&format!("&start={start}&end={end}&limit={num_intervals}"));
  538. } else {
  539. url.push_str(&format!("&limit={}", 200));
  540. }
  541. let response: reqwest::Response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  542. let text = response.text().await.map_err(StreamError::FetchError)?;
  543. let api_response: ApiResponse =
  544. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  545. let klines: Result<Vec<Kline>, StreamError> = api_response
  546. .result
  547. .list
  548. .iter()
  549. .map(|kline| {
  550. let time = parse_kline_field::<u64>(kline[0].as_str())?;
  551. let open = parse_kline_field::<f32>(kline[1].as_str())?;
  552. let high = parse_kline_field::<f32>(kline[2].as_str())?;
  553. let low = parse_kline_field::<f32>(kline[3].as_str())?;
  554. let close = parse_kline_field::<f32>(kline[4].as_str())?;
  555. let volume = parse_kline_field::<f32>(kline[5].as_str())?;
  556. Ok(Kline {
  557. time,
  558. open,
  559. high,
  560. low,
  561. close,
  562. volume: (-1.0, volume),
  563. })
  564. })
  565. .collect();
  566. klines
  567. }
  568. pub async fn fetch_ticksize(market_type: MarketType) -> Result<HashMap<Ticker, Option<TickerInfo>>, StreamError> {
  569. let market = match market_type {
  570. MarketType::Spot => "spot",
  571. MarketType::LinearPerps => "linear",
  572. };
  573. let url = format!("https://api.bybit.com/v5/market/instruments-info?category={market}");
  574. let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  575. let text = response.text().await.map_err(StreamError::FetchError)?;
  576. let exchange_info: Value =
  577. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  578. let result_list: &Vec<Value> = exchange_info["result"]["list"]
  579. .as_array()
  580. .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
  581. let mut ticker_info_map = HashMap::new();
  582. let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
  583. for item in result_list {
  584. let symbol = item["symbol"]
  585. .as_str()
  586. .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
  587. if !re.is_match(symbol) {
  588. continue;
  589. }
  590. let price_filter = item["priceFilter"]
  591. .as_object()
  592. .ok_or_else(|| StreamError::ParseError("Price filter not found".to_string()))?;
  593. let tick_size = price_filter["tickSize"]
  594. .as_str()
  595. .ok_or_else(|| StreamError::ParseError("Tick size not found".to_string()))?
  596. .parse::<f32>()
  597. .map_err(|_| StreamError::ParseError("Failed to parse tick size".to_string()))?;
  598. ticker_info_map.insert(Ticker::new(symbol, market_type), Some(TickerInfo { tick_size, market_type }));
  599. }
  600. Ok(ticker_info_map)
  601. }
  602. pub async fn fetch_ticker_prices(market_type: MarketType) -> Result<HashMap<Ticker, TickerStats>, StreamError> {
  603. let market = match market_type {
  604. MarketType::Spot => "spot",
  605. MarketType::LinearPerps => "linear",
  606. };
  607. let url = format!("https://api.bybit.com/v5/market/tickers?category={market}");
  608. let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  609. let text = response.text().await.map_err(StreamError::FetchError)?;
  610. let exchange_info: Value =
  611. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  612. let result_list: &Vec<Value> = exchange_info["result"]["list"]
  613. .as_array()
  614. .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
  615. let mut ticker_prices_map = HashMap::new();
  616. let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
  617. for item in result_list {
  618. let symbol = item["symbol"]
  619. .as_str()
  620. .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
  621. if !re.is_match(symbol) {
  622. continue;
  623. }
  624. let mark_price = item["lastPrice"]
  625. .as_str()
  626. .ok_or_else(|| StreamError::ParseError("Mark price not found".to_string()))?
  627. .parse::<f32>()
  628. .map_err(|_| StreamError::ParseError("Failed to parse mark price".to_string()))?;
  629. let daily_price_chg = item["price24hPcnt"]
  630. .as_str()
  631. .ok_or_else(|| StreamError::ParseError("Daily price change not found".to_string()))?
  632. .parse::<f32>()
  633. .map_err(|_| {
  634. StreamError::ParseError("Failed to parse daily price change".to_string())
  635. })?;
  636. let daily_volume = item["volume24h"]
  637. .as_str()
  638. .ok_or_else(|| StreamError::ParseError("Daily volume not found".to_string()))?
  639. .parse::<f32>()
  640. .map_err(|_| StreamError::ParseError("Failed to parse daily volume".to_string()))?;
  641. let quote_volume = daily_volume * mark_price;
  642. if quote_volume < 4_000_000.0 {
  643. continue;
  644. }
  645. let ticker_stats = TickerStats {
  646. mark_price,
  647. daily_price_chg: daily_price_chg * 100.0,
  648. daily_volume: quote_volume,
  649. };
  650. ticker_prices_map.insert(Ticker::new(symbol, market_type), ticker_stats);
  651. }
  652. Ok(ticker_prices_map)
  653. }