ws_binance.rs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. use iced::futures;
  2. use iced::subscription::{self, Subscription};
  3. use serde::Deserialize;
  4. mod string_to_f32 {
  5. use serde::{self, Deserialize, Deserializer};
  6. pub fn deserialize<'de, D>(deserializer: D) -> Result<f32, D::Error>
  7. where
  8. D: Deserializer<'de>,
  9. {
  10. let s = String::deserialize(deserializer)?;
  11. s.parse::<f32>().map_err(serde::de::Error::custom)
  12. }
  13. }
  14. use futures::channel::mpsc;
  15. use futures::sink::SinkExt;
  16. use futures::stream::StreamExt;
  17. use futures::FutureExt;
  18. use async_tungstenite::tungstenite;
  19. #[derive(Deserialize, Debug, Clone)]
  20. pub struct StreamWrapper {
  21. pub stream: String,
  22. pub data: serde_json::Value,
  23. }
  24. #[derive(Deserialize, Debug, Clone)]
  25. pub struct Trade {
  26. #[serde(rename = "T")]
  27. pub time: u64,
  28. #[serde(rename = "m")]
  29. pub is_sell: bool,
  30. #[serde(with = "string_to_f32", rename = "p")]
  31. pub price: f32,
  32. #[serde(with = "string_to_f32", rename = "q")]
  33. pub qty: f32,
  34. }
  35. #[derive(Deserialize, Debug, Clone)]
  36. pub struct Kline {
  37. #[serde(rename = "t")]
  38. pub time: u64,
  39. #[serde(with = "string_to_f32", rename = "o")]
  40. pub open: f32,
  41. #[serde(with = "string_to_f32", rename = "h")]
  42. pub high: f32,
  43. #[serde(with = "string_to_f32", rename = "l")]
  44. pub low: f32,
  45. #[serde(with = "string_to_f32", rename = "c")]
  46. pub close: f32,
  47. #[serde(with = "string_to_f32", rename = "v")]
  48. pub volume: f32,
  49. #[serde(with = "string_to_f32", rename = "V")]
  50. pub taker_buy_base_asset_volume: f32,
  51. }
  52. pub fn connect(selected_ticker: String) -> Subscription<Event> {
  53. struct Connect;
  54. subscription::channel(
  55. std::any::TypeId::of::<Connect>(),
  56. 100,
  57. |mut output| async move {
  58. let mut state = State::Disconnected;
  59. let mut trades_buffer = Vec::new();
  60. let buffer_flush_interval = tokio::time::Duration::from_millis(1000 / 30);
  61. loop {
  62. match &mut state {
  63. State::Disconnected => {
  64. let websocket_server = format!("wss://fstream.binance.com/stream?streams={}@aggTrade/{}@kline_1m", selected_ticker.to_lowercase(), selected_ticker.to_lowercase());
  65. match async_tungstenite::tokio::connect_async(
  66. websocket_server,
  67. )
  68. .await
  69. {
  70. Ok((websocket, _)) => {
  71. state = State::Connected(websocket);
  72. }
  73. Err(_) => {
  74. tokio::time::sleep(
  75. tokio::time::Duration::from_secs(1),
  76. )
  77. .await;
  78. let _ = output.send(Event::Disconnected).await;
  79. }
  80. }
  81. }
  82. State::Connected(websocket) => {
  83. let mut fused_websocket = websocket.by_ref().fuse();
  84. futures::select! {
  85. received = fused_websocket.select_next_some() => {
  86. match received {
  87. Ok(tungstenite::Message::Text(message)) => {
  88. let parsed_message: Result<StreamWrapper, _> = serde_json::from_str(&message);
  89. match parsed_message {
  90. Ok(mut wrapper) => {
  91. if wrapper.stream.contains("aggTrade") {
  92. let trade: Result<Trade, _> = serde_json::from_str(&wrapper.data.to_string());
  93. match trade {
  94. Ok(trade) => {
  95. trades_buffer.push(trade);
  96. },
  97. Err(e) => {
  98. dbg!(e);
  99. }
  100. }
  101. } else if wrapper.stream.contains("kline") {
  102. if let Some(kline) = wrapper.data.get_mut("k") {
  103. let kline: Result<Kline, _> = serde_json::from_value(kline.take());
  104. match kline {
  105. Ok(kline) => {
  106. let _ = output.send(Event::KlineReceived(kline)).await;
  107. },
  108. Err(e) => {
  109. dbg!(e);
  110. }
  111. }
  112. }
  113. }
  114. },
  115. Err(e) => {
  116. dbg!(e);
  117. dbg!(message);
  118. }
  119. }
  120. }
  121. Err(_) => {
  122. let _ = output.send(Event::Disconnected).await;
  123. state = State::Disconnected;
  124. }
  125. Ok(_) => continue,
  126. }
  127. }
  128. _ = tokio::time::sleep(buffer_flush_interval).fuse() => {
  129. if !trades_buffer.is_empty() {
  130. let _ = output.send(Event::TradeReceived(std::mem::take(&mut trades_buffer))).await;
  131. }
  132. }
  133. }
  134. }
  135. }
  136. }
  137. },
  138. )
  139. }
  140. #[derive(Debug)]
  141. #[allow(clippy::large_enum_variant)]
  142. enum State {
  143. Disconnected,
  144. Connected(
  145. async_tungstenite::WebSocketStream<
  146. async_tungstenite::tokio::ConnectStream,
  147. >,
  148. ),
  149. }
  150. #[derive(Debug, Clone)]
  151. pub enum Event {
  152. Connected(Connection),
  153. Disconnected,
  154. TradeReceived(Vec<Trade>),
  155. KlineReceived(Kline),
  156. }
  157. #[derive(Debug, Clone)]
  158. pub struct Connection(mpsc::Sender<String>);