market_data.rs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. use hyper::client::conn;
  2. use iced::futures;
  3. use iced::subscription::{self, Subscription};
  4. use serde::{de, Deserializer};
  5. use futures::sink::SinkExt;
  6. use serde_json::Value;
  7. use crate::{Ticker, Timeframe};
  8. use bytes::Bytes;
  9. use sonic_rs::{LazyValue, JsonValueTrait};
  10. use sonic_rs::{Deserialize, Serialize};
  11. use sonic_rs::{to_array_iter, to_object_iter_unchecked};
  12. use anyhow::{Context, Result};
  13. use fastwebsockets::{Frame, FragmentCollector, OpCode};
  14. use http_body_util::Empty;
  15. use hyper::header::{CONNECTION, UPGRADE};
  16. use hyper::upgrade::Upgraded;
  17. use hyper::Request;
  18. use hyper_util::rt::TokioIo;
  19. use tokio::net::TcpStream;
  20. use tokio_rustls::rustls::{ClientConfig, OwnedTrustAnchor};
  21. use tokio_rustls::TlsConnector;
  22. #[allow(clippy::large_enum_variant)]
  23. enum State {
  24. Disconnected,
  25. Connected(
  26. FragmentCollector<TokioIo<Upgraded>>
  27. ),
  28. }
  29. #[derive(Debug, Clone, Copy)]
  30. pub struct FeedLatency {
  31. pub time: i64,
  32. pub depth_latency: i64,
  33. pub trade_latency: Option<i64>,
  34. }
  35. #[derive(Debug, Clone)]
  36. pub enum Event {
  37. Connected(Connection),
  38. Disconnected,
  39. DepthReceived(FeedLatency, i64, LocalDepthCache, Vec<Trade>),
  40. KlineReceived(Kline, Timeframe),
  41. }
  42. #[derive(Debug, Clone)]
  43. pub struct Connection;
  44. impl<'de> Deserialize<'de> for Order {
  45. fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
  46. where
  47. D: Deserializer<'de>,
  48. {
  49. let arr: Vec<&str> = Vec::<&str>::deserialize(deserializer)?;
  50. let price: f32 = arr[0].parse::<f32>().map_err(serde::de::Error::custom)?;
  51. let qty: f32 = arr[1].parse::<f32>().map_err(serde::de::Error::custom)?;
  52. Ok(Order { price, qty })
  53. }
  54. }
  55. #[derive(Debug, Deserialize, Clone)]
  56. pub struct FetchedDepth {
  57. #[serde(rename = "lastUpdateId")]
  58. update_id: i64,
  59. #[serde(rename = "T")]
  60. time: i64,
  61. #[serde(rename = "bids")]
  62. bids: Vec<Order>,
  63. #[serde(rename = "asks")]
  64. asks: Vec<Order>,
  65. }
  66. #[derive(Debug, Clone, Copy, Default)]
  67. pub struct Order {
  68. pub price: f32,
  69. pub qty: f32,
  70. }
  71. #[derive(Debug, Clone, Default)]
  72. pub struct LocalDepthCache {
  73. pub time: i64,
  74. pub bids: Box<[Order]>,
  75. pub asks: Box<[Order]>,
  76. }
  77. pub struct Depth {
  78. pub last_update_id: i64,
  79. pub time: i64,
  80. pub bids: Vec<Order>,
  81. pub asks: Vec<Order>,
  82. }
  83. impl Depth {
  84. pub fn new() -> Self {
  85. Self {
  86. last_update_id: 0,
  87. time: 0,
  88. bids: Vec::new(),
  89. asks: Vec::new(),
  90. }
  91. }
  92. pub fn fetched(&mut self, new_depth: Depth) {
  93. self.last_update_id = new_depth.last_update_id;
  94. self.time = new_depth.time;
  95. self.bids = new_depth.bids;
  96. self.asks = new_depth.asks;
  97. }
  98. pub fn update_depth_cache(&mut self, new_bids: &[Order], new_asks: &[Order]) {
  99. for order in new_bids {
  100. if order.qty == 0.0 {
  101. self.bids.retain(|x| x.price != order.price);
  102. } else {
  103. if let Some(existing_order) = self.bids.iter_mut().find(|x| x.price == order.price) {
  104. existing_order.qty = order.qty;
  105. } else {
  106. self.bids.push(*order);
  107. }
  108. }
  109. }
  110. for order in new_asks {
  111. if order.qty == 0.0 {
  112. self.asks.retain(|x| x.price != order.price);
  113. } else {
  114. if let Some(existing_order) = self.asks.iter_mut().find(|x| x.price == order.price) {
  115. existing_order.qty = order.qty;
  116. } else {
  117. self.asks.push(*order);
  118. }
  119. }
  120. }
  121. }
  122. pub fn update_levels(&mut self, new_depth: Depth) -> (Box<[Order]>, Box<[Order]>) {
  123. self.last_update_id = new_depth.last_update_id;
  124. self.time = new_depth.time;
  125. let mut best_ask_price = f32::MAX;
  126. let mut best_bid_price = 0.0f32;
  127. self.bids.iter().for_each(|order| {
  128. if order.price > best_bid_price {
  129. best_bid_price = order.price;
  130. }
  131. });
  132. self.asks.iter().for_each(|order| {
  133. if order.price < best_ask_price {
  134. best_ask_price = order.price;
  135. }
  136. });
  137. let highest: f32 = best_ask_price * 1.001;
  138. let lowest: f32 = best_bid_price * 0.999;
  139. self.update_depth_cache(&new_depth.bids, &new_depth.asks);
  140. let mut local_bids: Vec<Order> = Vec::new();
  141. let mut local_asks: Vec<Order> = Vec::new();
  142. for order in &self.bids {
  143. if order.price >= lowest {
  144. local_bids.push(*order);
  145. }
  146. }
  147. for order in &self.asks {
  148. if order.price <= highest {
  149. local_asks.push(*order);
  150. }
  151. }
  152. // first sort by price
  153. local_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
  154. local_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
  155. (local_bids.into_boxed_slice(), local_asks.into_boxed_slice())
  156. }
  157. pub fn get_fetch_id(&self) -> i64 {
  158. self.last_update_id
  159. }
  160. }
  161. #[derive(Debug, Clone, Copy)]
  162. pub struct Trade {
  163. pub time: i64,
  164. pub is_sell: bool,
  165. pub price: f32,
  166. pub qty: f32,
  167. }
  168. #[derive(Serialize, Deserialize, Debug)]
  169. struct SonicDepth {
  170. #[serde(rename = "T")]
  171. time: u64,
  172. #[serde(rename = "U")]
  173. first_id: u64,
  174. #[serde(rename = "u")]
  175. final_id: u64,
  176. #[serde(rename = "pu")]
  177. prev_final_id: u64,
  178. #[serde(rename = "b")]
  179. bids: Vec<BidAsk>,
  180. #[serde(rename = "a")]
  181. asks: Vec<BidAsk>,
  182. }
  183. #[derive(Serialize, Deserialize, Debug)]
  184. struct BidAsk {
  185. #[serde(rename = "0")]
  186. price: String,
  187. #[serde(rename = "1")]
  188. qty: String,
  189. }
  190. #[derive(Serialize, Deserialize, Debug)]
  191. struct SonicTrade {
  192. #[serde(rename = "T")]
  193. time: u64,
  194. #[serde(rename = "p")]
  195. price: String,
  196. #[serde(rename = "q")]
  197. qty: String,
  198. #[serde(rename = "m")]
  199. is_sell: bool,
  200. }
  201. #[derive(Deserialize, Debug, Clone)]
  202. struct SonicKline {
  203. #[serde(rename = "t")]
  204. time: u64,
  205. #[serde(rename = "o")]
  206. open: String,
  207. #[serde(rename = "h")]
  208. high: String,
  209. #[serde(rename = "l")]
  210. low: String,
  211. #[serde(rename = "c")]
  212. close: String,
  213. #[serde(rename = "v")]
  214. volume: String,
  215. #[serde(rename = "V")]
  216. taker_buy_base_asset_volume: String,
  217. #[serde(rename = "i")]
  218. interval: String,
  219. }
  220. #[derive(Deserialize, Debug, Clone)]
  221. struct SonicKlineWrap {
  222. #[serde(rename = "k")]
  223. kline: SonicKline,
  224. }
  225. #[derive(Debug)]
  226. enum StreamData {
  227. Trade(SonicTrade),
  228. Depth(SonicDepth),
  229. Kline(SonicKline),
  230. }
  231. #[derive(Debug)]
  232. enum StreamName {
  233. Depth,
  234. Trade,
  235. Kline,
  236. Unknown,
  237. }
  238. impl StreamName {
  239. fn from_symbol_and_type(symbol: &str, stream_type: &str) -> Self {
  240. match stream_type {
  241. _ if stream_type == format!("{symbol}@depth@100ms") => StreamName::Depth,
  242. _ if stream_type == format!("{symbol}@trade") => StreamName::Trade,
  243. _ if stream_type.starts_with(&format!("{symbol}@kline_")) => StreamName::Kline,
  244. _ => StreamName::Unknown,
  245. }
  246. }
  247. }
  248. #[derive(Debug)]
  249. enum StreamWrapper {
  250. Trade,
  251. Depth,
  252. Kline,
  253. }
  254. fn feed_de(bytes: &Bytes, symbol: &str) -> Result<StreamData> {
  255. let mut stream_type: Option<StreamWrapper> = None;
  256. let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(bytes) };
  257. for elem in iter {
  258. let (k, v) = elem
  259. .context("Error parsing stream")?;
  260. if k == "stream" {
  261. if let Some(val) = v.as_str() {
  262. match StreamName::from_symbol_and_type(symbol, val) {
  263. StreamName::Depth => {
  264. stream_type = Some(StreamWrapper::Depth);
  265. },
  266. StreamName::Trade => {
  267. stream_type = Some(StreamWrapper::Trade);
  268. },
  269. StreamName::Kline => {
  270. stream_type = Some(StreamWrapper::Kline);
  271. },
  272. _ => {
  273. eprintln!("Unknown stream name");
  274. }
  275. }
  276. }
  277. } else if k == "data" {
  278. match stream_type {
  279. Some(StreamWrapper::Trade) => {
  280. let trade: SonicTrade = sonic_rs::from_str(&v.as_raw_faststr())
  281. .context("Error parsing trade")?;
  282. return Ok(StreamData::Trade(trade));
  283. },
  284. Some(StreamWrapper::Depth) => {
  285. let depth: SonicDepth = sonic_rs::from_str(&v.as_raw_faststr())
  286. .context("Error parsing depth")?;
  287. return Ok(StreamData::Depth(depth));
  288. },
  289. Some(StreamWrapper::Kline) => {
  290. let kline_wrap: SonicKlineWrap = sonic_rs::from_str(&v.as_raw_faststr())
  291. .context("Error parsing kline")?;
  292. return Ok(StreamData::Kline(kline_wrap.kline));
  293. },
  294. _ => {
  295. eprintln!("Unknown stream type");
  296. }
  297. }
  298. } else {
  299. eprintln!("Unknown data: {:?}", k);
  300. }
  301. }
  302. Err(anyhow::anyhow!("Unknown data"))
  303. }
  304. fn tls_connector() -> Result<TlsConnector> {
  305. let mut root_store = tokio_rustls::rustls::RootCertStore::empty();
  306. root_store.add_trust_anchors(
  307. webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
  308. OwnedTrustAnchor::from_subject_spki_name_constraints(
  309. ta.subject,
  310. ta.spki,
  311. ta.name_constraints,
  312. )
  313. }),
  314. );
  315. let config = ClientConfig::builder()
  316. .with_safe_defaults()
  317. .with_root_certificates(root_store)
  318. .with_no_client_auth();
  319. Ok(TlsConnector::from(std::sync::Arc::new(config)))
  320. }
  321. async fn connect(domain: &str, streams: &str) -> Result<FragmentCollector<TokioIo<Upgraded>>> {
  322. let mut addr = String::from(domain);
  323. addr.push_str(":443");
  324. let tcp_stream: TcpStream = TcpStream::connect(&addr).await?;
  325. let tls_connector: TlsConnector = tls_connector().unwrap();
  326. let domain: tokio_rustls::rustls::ServerName =
  327. tokio_rustls::rustls::ServerName::try_from(domain).map_err(|_| {
  328. std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid dnsname")
  329. })?;
  330. let tls_stream: tokio_rustls::client::TlsStream<TcpStream> = tls_connector.connect(domain, tcp_stream).await?;
  331. let url = format!("wss://{}/stream?streams={}", &addr, streams);
  332. println!("Connecting to {}", url);
  333. let req: Request<Empty<Bytes>> = Request::builder()
  334. .method("GET")
  335. .uri(url)
  336. .header("Host", &addr)
  337. .header(UPGRADE, "websocket")
  338. .header(CONNECTION, "upgrade")
  339. .header(
  340. "Sec-WebSocket-Key",
  341. fastwebsockets::handshake::generate_key(),
  342. )
  343. .header("Sec-WebSocket-Version", "13")
  344. .body(Empty::<Bytes>::new())?;
  345. let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, tls_stream).await?;
  346. Ok(FragmentCollector::new(ws))
  347. }
  348. struct SpawnExecutor;
  349. impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
  350. where
  351. Fut: std::future::Future + Send + 'static,
  352. Fut::Output: Send + 'static,
  353. {
  354. fn execute(&self, fut: Fut) {
  355. tokio::task::spawn(fut);
  356. }
  357. }
  358. pub fn connect_market_stream(selected_ticker: Ticker) -> Subscription<Event> {
  359. struct Connect;
  360. subscription::channel(
  361. std::any::TypeId::of::<Connect>(),
  362. 100,
  363. move |mut output| async move {
  364. let mut state = State::Disconnected;
  365. let mut trades_buffer: Vec<Trade> = Vec::new();
  366. let symbol_str = match selected_ticker {
  367. Ticker::BTCUSDT => "btcusdt",
  368. Ticker::ETHUSDT => "ethusdt",
  369. Ticker::SOLUSDT => "solusdt",
  370. Ticker::LTCUSDT => "ltcusdt",
  371. };
  372. let stream_1 = format!("{symbol_str}@trade");
  373. let stream_2 = format!("{symbol_str}@depth@100ms");
  374. let mut orderbook: Depth = Depth::new();
  375. let mut already_fetching: bool = false;
  376. let mut prev_id: u64 = 0;
  377. let mut trade_latencies: Vec<i64> = Vec::new();
  378. loop {
  379. match &mut state {
  380. State::Disconnected => {
  381. let streams = format!("{stream_1}/{stream_2}");
  382. let domain: &str = "fstream.binance.com";
  383. if let Ok(websocket) = connect(domain, streams.as_str()
  384. )
  385. .await {
  386. let (tx, rx) = tokio::sync::oneshot::channel();
  387. tokio::spawn(async move {
  388. let fetched_depth = fetch_depth(selected_ticker).await;
  389. let depth: Depth = match fetched_depth {
  390. Ok(depth) => {
  391. Depth {
  392. last_update_id: depth.update_id,
  393. time: depth.time,
  394. bids: depth.bids,
  395. asks: depth.asks,
  396. }
  397. },
  398. Err(_) => return,
  399. };
  400. let _ = tx.send(depth);
  401. });
  402. match rx.await {
  403. Ok(depth) => {
  404. orderbook.fetched(depth);
  405. state = State::Connected(websocket);
  406. },
  407. Err(_) => output.send(Event::Disconnected).await.expect("Trying to send disconnect event..."),
  408. }
  409. } else {
  410. tokio::time::sleep(tokio::time::Duration::from_secs(1))
  411. .await;
  412. let _ = output.send(Event::Disconnected).await;
  413. }
  414. },
  415. State::Connected(ws) => {
  416. let feed_latency: FeedLatency;
  417. match ws.read_frame().await {
  418. Ok(msg) => match msg.opcode {
  419. OpCode::Text => {
  420. let json_bytes: Bytes = Bytes::from(msg.payload.to_vec());
  421. if let Ok(data) = feed_de(&json_bytes, symbol_str) {
  422. match data {
  423. StreamData::Trade(de_trade) => {
  424. let trade = Trade {
  425. time: de_trade.time as i64,
  426. is_sell: de_trade.is_sell,
  427. price: str_f32_parse(&de_trade.price),
  428. qty: str_f32_parse(&de_trade.qty),
  429. };
  430. trade_latencies.push(
  431. chrono::Utc::now().timestamp_millis() - trade.time
  432. );
  433. trades_buffer.push(trade);
  434. },
  435. StreamData::Depth(de_depth) => {
  436. if already_fetching {
  437. println!("Already fetching...\n");
  438. continue;
  439. }
  440. let last_update_id = orderbook.get_fetch_id() as u64;
  441. if (de_depth.final_id <= last_update_id) || last_update_id == 0 {
  442. continue;
  443. }
  444. if prev_id == 0 && (de_depth.first_id > last_update_id + 1) || (last_update_id + 1 > de_depth.final_id) {
  445. println!("Out of sync at first event. Trying to resync...\n");
  446. let (tx, rx) = tokio::sync::oneshot::channel();
  447. already_fetching = true;
  448. tokio::spawn(async move {
  449. let fetched_depth = fetch_depth(selected_ticker).await;
  450. let depth: Depth = match fetched_depth {
  451. Ok(depth) => {
  452. Depth {
  453. last_update_id: depth.update_id,
  454. time: depth.time,
  455. bids: depth.bids,
  456. asks: depth.asks,
  457. }
  458. },
  459. Err(_) => return,
  460. };
  461. let _ = tx.send(depth);
  462. });
  463. match rx.await {
  464. Ok(depth) => {
  465. orderbook.fetched(depth)
  466. },
  467. Err(_) => {
  468. state = State::Disconnected;
  469. output.send(Event::Disconnected).await.expect("Trying to send disconnect event...");
  470. },
  471. }
  472. already_fetching = false;
  473. }
  474. if (prev_id == 0) || (prev_id == de_depth.prev_final_id) {
  475. let time = de_depth.time as i64;
  476. let depth_latency = chrono::Utc::now().timestamp_millis() - time;
  477. let depth_update = Depth {
  478. last_update_id: de_depth.final_id as i64,
  479. time,
  480. bids: de_depth.bids.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
  481. asks: de_depth.asks.iter().map(|x| Order { price: str_f32_parse(&x.price), qty: str_f32_parse(&x.qty) }).collect(),
  482. };
  483. let (local_bids, local_asks) = orderbook.update_levels(depth_update);
  484. let local_depth_cache = LocalDepthCache {
  485. time,
  486. bids: local_bids,
  487. asks: local_asks,
  488. };
  489. let avg_trade_latency = if !trade_latencies.is_empty() {
  490. let avg = trade_latencies.iter().sum::<i64>() / trade_latencies.len() as i64;
  491. trade_latencies.clear();
  492. Some(avg)
  493. } else {
  494. None
  495. };
  496. feed_latency = FeedLatency {
  497. time,
  498. depth_latency,
  499. trade_latency: avg_trade_latency,
  500. };
  501. let _ = output.send(
  502. Event::DepthReceived(
  503. feed_latency,
  504. time,
  505. local_depth_cache,
  506. std::mem::take(&mut trades_buffer)
  507. )
  508. ).await;
  509. prev_id = de_depth.final_id;
  510. } else {
  511. eprintln!("Out of sync...\n");
  512. }
  513. },
  514. _ => {}
  515. }
  516. } else {
  517. eprintln!("\nUnknown data: {:?}", &json_bytes);
  518. }
  519. }
  520. OpCode::Close => {
  521. eprintln!("Connection closed");
  522. let _ = output.send(Event::Disconnected).await;
  523. }
  524. _ => {}
  525. },
  526. Err(e) => {
  527. println!("Error reading frame: {}", e);
  528. }
  529. };
  530. }
  531. }
  532. }
  533. },
  534. )
  535. }
  536. pub fn connect_kline_stream(vec: Vec<(Ticker, Timeframe)>) -> Subscription<Event> {
  537. struct Connect;
  538. subscription::channel(
  539. std::any::TypeId::of::<Connect>(),
  540. 100,
  541. move |mut output| async move {
  542. let mut state = State::Disconnected;
  543. let mut symbol_str: &str = "";
  544. let stream_str = vec.iter().map(|(ticker, timeframe)| {
  545. symbol_str = match ticker {
  546. Ticker::BTCUSDT => "btcusdt",
  547. Ticker::ETHUSDT => "ethusdt",
  548. Ticker::SOLUSDT => "solusdt",
  549. Ticker::LTCUSDT => "ltcusdt",
  550. };
  551. let timeframe_str = match timeframe {
  552. Timeframe::M1 => "1m",
  553. Timeframe::M3 => "3m",
  554. Timeframe::M5 => "5m",
  555. Timeframe::M15 => "15m",
  556. Timeframe::M30 => "30m",
  557. };
  558. format!("{symbol_str}@kline_{timeframe_str}")
  559. }).collect::<Vec<String>>().join("/");
  560. loop {
  561. match &mut state {
  562. State::Disconnected => {
  563. let domain: &str = "fstream.binance.com";
  564. let streams = stream_str.as_str();
  565. if let Ok(websocket) = connect(
  566. domain, streams
  567. )
  568. .await {
  569. state = State::Connected(websocket);
  570. } else {
  571. tokio::time::sleep(tokio::time::Duration::from_secs(1))
  572. .await;
  573. let _ = output.send(Event::Disconnected).await;
  574. }
  575. },
  576. State::Connected(ws) => {
  577. match ws.read_frame().await {
  578. Ok(msg) => match msg.opcode {
  579. OpCode::Text => {
  580. let json_bytes: Bytes = Bytes::from(msg.payload.to_vec());
  581. if let Ok(StreamData::Kline(de_kline)) = feed_de(&json_bytes, symbol_str) {
  582. let kline = Kline {
  583. time: de_kline.time,
  584. open: str_f32_parse(&de_kline.open),
  585. high: str_f32_parse(&de_kline.high),
  586. low: str_f32_parse(&de_kline.low),
  587. close: str_f32_parse(&de_kline.close),
  588. volume: str_f32_parse(&de_kline.volume),
  589. taker_buy_base_asset_volume: str_f32_parse(&de_kline.taker_buy_base_asset_volume),
  590. };
  591. if let Some(timeframe) = vec.iter().find(|(_, tf)| tf.to_string() == de_kline.interval) {
  592. let _ = output.send(Event::KlineReceived(kline, timeframe.1)).await;
  593. }
  594. } else {
  595. eprintln!("\nUnknown data: {:?}", &json_bytes);
  596. }
  597. }
  598. _ => {}
  599. },
  600. Err(e) => {
  601. eprintln!("Error reading frame: {}", e);
  602. }
  603. }
  604. }
  605. }
  606. }
  607. },
  608. )
  609. }
  610. fn str_f32_parse(s: &str) -> f32 {
  611. s.parse::<f32>().unwrap_or_else(|e| {
  612. eprintln!("Failed to parse float: {}, error: {}", s, e);
  613. 0.0
  614. })
  615. }
  616. mod string_to_f32 {
  617. use serde::{self, Deserialize, Deserializer};
  618. pub fn deserialize<'de, D>(deserializer: D) -> Result<f32, D::Error>
  619. where
  620. D: Deserializer<'de>,
  621. {
  622. let s: &str = <&str>::deserialize(deserializer)?;
  623. s.parse::<f32>().map_err(serde::de::Error::custom)
  624. }
  625. }
  626. #[derive(Debug, Clone, Copy)]
  627. pub struct Kline {
  628. pub time: u64,
  629. pub open: f32,
  630. pub high: f32,
  631. pub low: f32,
  632. pub close: f32,
  633. pub volume: f32,
  634. pub taker_buy_base_asset_volume: f32,
  635. }
  636. #[derive(Deserialize, Debug, Clone)]
  637. struct FetchedKlines (
  638. u64,
  639. #[serde(with = "string_to_f32")] f32,
  640. #[serde(with = "string_to_f32")] f32,
  641. #[serde(with = "string_to_f32")] f32,
  642. #[serde(with = "string_to_f32")] f32,
  643. #[serde(with = "string_to_f32")] f32,
  644. u64,
  645. String,
  646. u32,
  647. #[serde(with = "string_to_f32")] f32,
  648. String,
  649. String,
  650. );
  651. impl From<FetchedKlines> for Kline {
  652. fn from(fetched: FetchedKlines) -> Self {
  653. Self {
  654. time: fetched.0,
  655. open: fetched.1,
  656. high: fetched.2,
  657. low: fetched.3,
  658. close: fetched.4,
  659. volume: fetched.5,
  660. taker_buy_base_asset_volume: fetched.9,
  661. }
  662. }
  663. }
  664. pub async fn fetch_klines(ticker: Ticker, timeframe: Timeframe) -> Result<Vec<Kline>, reqwest::Error> {
  665. let symbol_str = match ticker {
  666. Ticker::BTCUSDT => "btcusdt",
  667. Ticker::ETHUSDT => "ethusdt",
  668. Ticker::SOLUSDT => "solusdt",
  669. Ticker::LTCUSDT => "ltcusdt",
  670. };
  671. let timeframe_str = match timeframe {
  672. Timeframe::M1 => "1m",
  673. Timeframe::M3 => "3m",
  674. Timeframe::M5 => "5m",
  675. Timeframe::M15 => "15m",
  676. Timeframe::M30 => "30m",
  677. };
  678. let url = format!("https://fapi.binance.com/fapi/v1/klines?symbol={symbol_str}&interval={timeframe_str}&limit=720");
  679. let response = reqwest::get(&url).await?;
  680. let text = response.text().await?;
  681. let fetched_klines: Result<Vec<FetchedKlines>, _> = serde_json::from_str(&text);
  682. let klines: Vec<Kline> = fetched_klines.unwrap().into_iter().map(Kline::from).collect();
  683. Ok(klines)
  684. }
  685. pub async fn fetch_depth(ticker: Ticker) -> Result<FetchedDepth, reqwest::Error> {
  686. let symbol_str = match ticker {
  687. Ticker::BTCUSDT => "btcusdt",
  688. Ticker::ETHUSDT => "ethusdt",
  689. Ticker::SOLUSDT => "solusdt",
  690. Ticker::LTCUSDT => "ltcusdt",
  691. };
  692. let url = format!("https://fapi.binance.com/fapi/v1/depth?symbol={symbol_str}&limit=500");
  693. let response = reqwest::get(&url).await?;
  694. let text = response.text().await?;
  695. let depth: FetchedDepth = serde_json::from_str(&text).unwrap();
  696. Ok(depth)
  697. }
  698. pub async fn fetch_ticksize(ticker: Ticker) -> Result<f32, reqwest::Error> {
  699. let symbol_str = match ticker {
  700. Ticker::BTCUSDT => "BTCUSDT",
  701. Ticker::ETHUSDT => "ETHUSDT",
  702. Ticker::SOLUSDT => "SOLUSDT",
  703. Ticker::LTCUSDT => "LTCUSDT",
  704. };
  705. let url = format!("https://fapi.binance.com/fapi/v1/exchangeInfo");
  706. let response = reqwest::get(&url).await?;
  707. let text = response.text().await?;
  708. let exchange_info: Value = serde_json::from_str(&text).unwrap();
  709. let symbols = exchange_info["symbols"].as_array().unwrap();
  710. let symbol = symbols.iter().find(|x| x["symbol"].as_str().unwrap() == symbol_str).unwrap();
  711. let tick_size = symbol["filters"].as_array().unwrap().iter().find(|x| x["filterType"].as_str().unwrap() == "PRICE_FILTER").unwrap()["tickSize"].as_str().unwrap().parse::<f32>().unwrap();
  712. Ok(tick_size)
  713. }
  714. pub async fn fetch_server_time() -> Result<i64> {
  715. let url = "https://fapi.binance.com/fapi/v1/time";
  716. let response = reqwest::get(url).await.context("Failed to send request")?;
  717. let text = response.text().await.context("Failed to read response")?;
  718. let server_time: Value = serde_json::from_str(&text).context("Failed to parse JSON")?;
  719. if let Some(time) = server_time["serverTime"].as_i64() {
  720. Ok(time)
  721. } else {
  722. anyhow::bail!("Invalid server time")
  723. }
  724. }