china_futures.rs 22 KB


  1. use tracing::warn;
  2. use crate::data_providers::{deserialize_string_to_f32, SpawnExecutor};
  3. use crate::data_providers::deserialize_string_to_i64;
  4. use std::collections::HashMap;
  5. use std::vec;
  6. use bytes::Bytes;
  7. use iced::{
  8. stream,
  9. futures::{sink::SinkExt, Stream},
  10. };
  11. use regex::Regex;
  12. use serde_json::json;
  13. use serde_json::Value;
  14. use sonic_rs::{Deserialize, JsonValueTrait, Serialize};
  15. use sonic_rs::to_object_iter_unchecked;
  16. use fastwebsockets::{Frame, FragmentCollector, OpCode};
  17. use http_body_util::Empty;
  18. use hyper::header::{CONNECTION, UPGRADE};
  19. use hyper::Request;
  20. use hyper::upgrade::Upgraded;
  21. use hyper_util::rt::TokioIo;
  22. use tokio::net::TcpStream;
  23. use crate::data_providers::{
  24. Connection, Event, Kline, LocalDepthCache, MarketType, Order, State,
  25. StreamError, TickerInfo, TickerStats, Trade, VecLocalDepthCache,
  26. };
  27. use crate::{Ticker, Timeframe};
  28. use super::str_f32_parse;
  29. use super::OpenInterest;
  30. use tracing::error;
  31. #[derive(Serialize, Deserialize, Debug)]
  32. struct SonicDepth {
  33. #[serde(rename = "time")]
  34. pub update_id: u64,
  35. #[serde(rename = "bids")]
  36. pub bids: Vec<BidAsk>,
  37. #[serde(rename = "asks")]
  38. pub asks: Vec<BidAsk>,
  39. }
  40. #[derive(Serialize, Deserialize, Debug)]
  41. struct BidAsk {
  42. #[serde(rename = "0")]
  43. pub price: String,
  44. #[serde(rename = "1")]
  45. pub qty: String,
  46. }
  47. #[derive(Serialize, Deserialize, Debug)]
  48. struct SonicTrade {
  49. #[serde(rename = "time")]
  50. pub time: u64,
  51. #[serde(rename = "last_price")]
  52. pub price: String,
  53. #[serde(rename = "last_qty")]
  54. pub qty: String,
  55. #[serde(rename = "side")]
  56. pub side: String,
  57. }
  58. #[derive(Debug)]
  59. enum StreamData {
  60. Trade(Vec<SonicTrade>),
  61. Depth(SonicDepth, i64),
  62. // Kline(Ticker, Vec<SonicKline>),
  63. }
  64. #[allow(unused_assignments)]
  65. fn feed_de(
  66. slice: &[u8]
  67. ) -> Result<Vec<StreamData>, StreamError> {
  68. // // 这里应该是做缓存,之前的bybit数据推送是增量的
  69. // let mut depth_wrap: Option<SonicDepth> = None;
  70. let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(slice) };
  71. let mut trade = SonicTrade {
  72. time: 0,
  73. price: "".to_string(),
  74. qty: "".to_string(),
  75. side: "".to_string(),
  76. };
  77. let mut depth = SonicDepth {
  78. update_id: 0,
  79. bids: vec![],
  80. asks: vec![],
  81. };
  82. for elem in iter {
  83. let (k, v) = elem.map_err(|e| StreamError::ParseError(e.to_string()))?;
  84. // let v = &v.as_raw_faststr();
  85. match k.as_str() {
  86. "time" => {
  87. let t: u64 = v.as_u64().unwrap();
  88. trade.time = t;
  89. depth.update_id = t;
  90. }
  91. "last_price" => {
  92. trade.price = v.to_string();
  93. }
  94. "last_qty" => {
  95. trade.qty = v.to_string();
  96. }
  97. "side" => {
  98. trade.side = v.to_string();
  99. }
  100. "asks" => {
  101. let asks = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
  102. .unwrap();
  103. for ask in asks.as_array().unwrap() {
  104. let order_book = BidAsk {
  105. price: ask.as_array().unwrap()[0].to_string(),
  106. qty: ask.as_array().unwrap()[1].to_string(),
  107. };
  108. depth.asks.push(order_book);
  109. }
  110. }
  111. "bids" => {
  112. let bids = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
  113. .unwrap();
  114. for bid in bids.as_array().unwrap() {
  115. let order_book = BidAsk {
  116. price: bid.as_array().unwrap()[0].to_string(),
  117. qty: bid.as_array().unwrap()[1].to_string(),
  118. };
  119. depth.bids.push(order_book);
  120. }
  121. }
  122. &_ => {}
  123. }
  124. }
  125. let update_id = depth.update_id;
  126. Ok(vec![StreamData::Trade(vec![trade]), StreamData::Depth(depth, update_id as i64)])
  127. }
  128. async fn connect() -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
  129. let url = "ws://localhost:6789";
  130. let addr = "localhost:6789";
  131. let stream = TcpStream::connect(&addr).await
  132. .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
  133. // 2. 构建 WebSocket 握手请求
  134. let req = Request::builder()
  135. .method("GET")
  136. .uri(url)
  137. .header("Host", "localhost")
  138. .header(UPGRADE, "websocket")
  139. .header(CONNECTION, "upgrade")
  140. .header("Sec-WebSocket-Key", fastwebsockets::handshake::generate_key())
  141. .header("Sec-WebSocket-Version", "13")
  142. .header("Sec-WebSocket-Protocol", "rust-websocket") // 可选协议
  143. .header("User-Agent", "rust-client/1.0") // 添加 UA 头
  144. .body(Empty::<Bytes>::new())
  145. .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
  146. let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
  147. .await
  148. .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
  149. Ok(FragmentCollector::new(ws))
  150. }
  151. async fn try_connect(
  152. streams: &Value,
  153. output: &mut futures::channel::mpsc::Sender<Event>,
  154. ) -> State {
  155. match connect().await {
  156. Ok(mut websocket) => {
  157. if let Err(e) = websocket
  158. .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
  159. streams.to_string().as_bytes(),
  160. )))
  161. .await
  162. {
  163. let _ = output
  164. .send(Event::Disconnected(format!("Failed subscribing: {e}")))
  165. .await;
  166. return State::Disconnected;
  167. }
  168. let _ = output.send(Event::Connected(Connection)).await;
  169. State::Connected(websocket)
  170. }
  171. Err(err) => {
  172. tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  173. let _ = output
  174. .send(Event::Disconnected(format!("Failed to connect: {err}")))
  175. .await;
  176. State::Disconnected
  177. }
  178. }
  179. }
  180. pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
  181. stream::channel(100, move |mut output| async move {
  182. let mut state: State = State::Disconnected;
  183. let (symbol_str, market_type) = ticker.get_string();
  184. let stream_1 = format!("publicTrade.{symbol_str}");
  185. let stream_2 = format!(
  186. "orderbook.{}.{}",
  187. match market_type {
  188. MarketType::Spot => "200",
  189. MarketType::LinearPerps => "500",
  190. },
  191. symbol_str,
  192. );
  193. let subscribe_message = json!({
  194. "op": "subscribe",
  195. "args": [stream_1, stream_2]
  196. });
  197. let mut trades_buffer: Vec<Trade> = Vec::new();
  198. let mut orderbook = LocalDepthCache::new();
  199. loop {
  200. match &mut state {
  201. State::Disconnected => {
  202. state = try_connect(
  203. &subscribe_message,
  204. &mut output
  205. ).await;
  206. }
  207. State::Connected(websocket) => match websocket.read_frame().await {
  208. Ok(msg) => match msg.opcode {
  209. OpCode::Text => {
  210. let result = feed_de(&msg.payload[..]);
  211. match result {
  212. Ok(data_vec) => {
  213. let trade_handle_rst = &data_vec[0];
  214. let depth_handle_rst = &data_vec[1];
  215. match trade_handle_rst {
  216. StreamData::Trade(de_trade_vec) => {
  217. for de_trade in de_trade_vec {
  218. let trade = Trade {
  219. time: de_trade.time as i64,
  220. is_sell: de_trade.side == "sell",
  221. price: str_f32_parse(&de_trade.price),
  222. qty: str_f32_parse(&de_trade.qty),
  223. };
  224. trades_buffer.push(trade);
  225. }
  226. }
  227. StreamData::Depth(_, _) => {}
  228. }
  229. match depth_handle_rst {
  230. StreamData::Depth(de_depth, time) => {
  231. let t = time.clone();
  232. let depth_update = VecLocalDepthCache {
  233. last_update_id: de_depth.update_id as i64,
  234. time: t,
  235. bids: de_depth
  236. .bids
  237. .iter()
  238. .map(|x| Order {
  239. price: str_f32_parse(&x.price),
  240. qty: str_f32_parse(&x.qty),
  241. })
  242. .collect(),
  243. asks: de_depth
  244. .asks
  245. .iter()
  246. .map(|x| Order {
  247. price: str_f32_parse(&x.price),
  248. qty: str_f32_parse(&x.qty),
  249. })
  250. .collect(),
  251. };
  252. orderbook.fetched(&depth_update);
  253. let _ = output
  254. .send(Event::DepthReceived(
  255. ticker,
  256. t,
  257. orderbook.get_depth(),
  258. std::mem::take(&mut trades_buffer).into_boxed_slice(),
  259. ))
  260. .await;
  261. }
  262. StreamData::Trade(_) => {}
  263. }
  264. }
  265. Err(e) => {
  266. // 处理错误
  267. error!("处理数据失败: {}", e);
  268. }
  269. }
  270. }
  271. OpCode::Close => {
  272. state = State::Disconnected;
  273. let _ = output
  274. .send(Event::Disconnected("Connection closed".to_string()))
  275. .await;
  276. }
  277. _ => {}
  278. },
  279. Err(e) => {
  280. state = State::Disconnected;
  281. let _ = output
  282. .send(Event::Disconnected(
  283. "Error reading frame: ".to_string() + &e.to_string(),
  284. ))
  285. .await;
  286. }
  287. },
  288. }
  289. }
  290. })
  291. }
  292. // fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
  293. // Timeframe::ALL
  294. // .iter()
  295. // .find(|&tf| tf.to_minutes().to_string() == interval)
  296. // .copied()
  297. // }
  298. #[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
  299. #[serde(rename_all = "camelCase")]
  300. struct DeOpenInterest {
  301. #[serde(rename = "openInterest", deserialize_with = "deserialize_string_to_f32")]
  302. pub value: f32,
  303. #[serde(deserialize_with = "deserialize_string_to_i64")]
  304. pub timestamp: i64,
  305. }
  306. pub async fn fetch_historical_oi(
  307. ticker: Ticker,
  308. range: Option<(i64, i64)>,
  309. period: Timeframe,
  310. ) -> Result<Vec<OpenInterest>, StreamError> {
  311. let ticker_str = ticker.get_string().0.to_uppercase();
  312. let period_str = match period {
  313. Timeframe::M5 => "5min",
  314. Timeframe::M15 => "15min",
  315. Timeframe::M30 => "30min",
  316. Timeframe::H1 => "1h",
  317. Timeframe::H2 => "2h",
  318. Timeframe::H4 => "4h",
  319. _ => {
  320. let err_msg = format!("Unsupported timeframe for open interest: {}", period);
  321. error!("{}", err_msg);
  322. return Err(StreamError::UnknownError(err_msg));
  323. }
  324. };
  325. let mut url = format!(
  326. "https://api.bybit.com/v5/market/open-interest?category=linear&symbol={}&intervalTime={}",
  327. ticker_str, period_str,
  328. );
  329. if let Some((start, end)) = range {
  330. let interval_ms = period.to_milliseconds() as i64;
  331. let num_intervals = ((end - start) / interval_ms).min(200);
  332. if num_intervals > 1 {
  333. url.push_str(&format!(
  334. "&startTime={start}&endTime={end}&limit={num_intervals}"
  335. ));
  336. } else {
  337. url.push_str("&limit=200");
  338. }
  339. } else {
  340. url.push_str("&limit=200");
  341. }
  342. let response = reqwest::get(&url)
  343. .await
  344. .map_err(|e| {
  345. error!("Failed to fetch from {}: {}", url, e);
  346. StreamError::FetchError(e)
  347. })?;
  348. let text = response.text()
  349. .await
  350. .map_err(|e| {
  351. error!("Failed to get response text from {}: {}", url, e);
  352. StreamError::FetchError(e)
  353. })?;
  354. let content: Value = sonic_rs::from_str(&text)
  355. .map_err(|e| {
  356. error!("Failed to parse JSON from {}: {}\nResponse: {}", url, e, text);
  357. StreamError::ParseError(e.to_string())
  358. })?;
  359. let result_list = content["result"]["list"]
  360. .as_array()
  361. .ok_or_else(|| {
  362. error!("Result list is not an array in response: {}", text);
  363. StreamError::ParseError("Result list is not an array".to_string())
  364. })?;
  365. let bybit_oi: Vec<DeOpenInterest> = serde_json::from_value(json!(result_list))
  366. .map_err(|e| {
  367. error!("Failed to parse open interest array: {}\nResponse: {}", e, text);
  368. StreamError::ParseError(format!("Failed to parse open interest: {e}"))
  369. })?;
  370. let open_interest: Vec<OpenInterest> = bybit_oi
  371. .into_iter()
  372. .map(|x| OpenInterest {
  373. time: x.timestamp,
  374. value: x.value,
  375. })
  376. .collect();
  377. if open_interest.is_empty() {
  378. warn!("No open interest data found for {}, from url: {}", ticker_str, url);
  379. }
  380. Ok(open_interest)
  381. }
  382. #[allow(dead_code)]
  383. #[derive(Deserialize, Debug)]
  384. struct ApiResponse {
  385. #[serde(rename = "retCode")]
  386. ret_code: u32,
  387. #[serde(rename = "retMsg")]
  388. ret_msg: String,
  389. result: ApiResult,
  390. }
  391. #[allow(dead_code)]
  392. #[derive(Deserialize, Debug)]
  393. struct ApiResult {
  394. symbol: String,
  395. category: String,
  396. list: Vec<Vec<Value>>,
  397. }
  398. pub async fn fetch_klines(
  399. ticker: Ticker,
  400. timeframe: Timeframe,
  401. range: Option<(i64, i64)>,
  402. ) -> Result<Vec<Kline>, StreamError> {
  403. let (symbol_str, market_type) = &ticker.get_string();
  404. let timeframe_str = timeframe.to_minutes().to_string();
  405. fn parse_kline_field<T: std::str::FromStr>(field: Option<&str>) -> Result<T, StreamError> {
  406. field
  407. .ok_or_else(|| StreamError::ParseError("Failed to parse kline".to_string()))
  408. .and_then(|s| {
  409. s.parse::<T>()
  410. .map_err(|_| StreamError::ParseError("Failed to parse kline".to_string()))
  411. })
  412. }
  413. let market = match market_type {
  414. MarketType::Spot => "spot",
  415. MarketType::LinearPerps => "linear",
  416. };
  417. let mut url = format!(
  418. "https://api.bybit.com/v5/market/kline?category={}&symbol={}&interval={}",
  419. market, symbol_str.to_uppercase(), timeframe_str
  420. );
  421. if let Some((start, end)) = range {
  422. let interval_ms = timeframe.to_milliseconds() as i64;
  423. let num_intervals = ((end - start) / interval_ms).min(1000);
  424. url.push_str(&format!("&start={start}&end={end}&limit={num_intervals}"));
  425. } else {
  426. url.push_str(&format!("&limit={}", 200));
  427. }
  428. let response: reqwest::Response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  429. let text = response.text().await.map_err(StreamError::FetchError)?;
  430. let api_response: ApiResponse =
  431. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  432. let klines: Result<Vec<Kline>, StreamError> = api_response
  433. .result
  434. .list
  435. .iter()
  436. .map(|kline| {
  437. let time = parse_kline_field::<u64>(kline[0].as_str())?;
  438. let open = parse_kline_field::<f32>(kline[1].as_str())?;
  439. let high = parse_kline_field::<f32>(kline[2].as_str())?;
  440. let low = parse_kline_field::<f32>(kline[3].as_str())?;
  441. let close = parse_kline_field::<f32>(kline[4].as_str())?;
  442. let volume = parse_kline_field::<f32>(kline[5].as_str())?;
  443. Ok(Kline {
  444. time,
  445. open,
  446. high,
  447. low,
  448. close,
  449. volume: (-1.0, volume),
  450. })
  451. })
  452. .collect();
  453. klines
  454. }
  455. pub async fn fetch_ticksize(market_type: MarketType) -> Result<HashMap<Ticker, Option<TickerInfo>>, StreamError> {
  456. let market = match market_type {
  457. MarketType::Spot => "spot",
  458. MarketType::LinearPerps => "linear",
  459. };
  460. let url = format!("https://api.bybit.com/v5/market/instruments-info?category={market}");
  461. let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  462. let text = response.text().await.map_err(StreamError::FetchError)?;
  463. let exchange_info: Value =
  464. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  465. let result_list: &Vec<Value> = exchange_info["result"]["list"]
  466. .as_array()
  467. .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
  468. let mut ticker_info_map = HashMap::new();
  469. let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
  470. for item in result_list {
  471. let symbol = item["symbol"]
  472. .as_str()
  473. .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
  474. if !re.is_match(symbol) {
  475. continue;
  476. }
  477. let price_filter = item["priceFilter"]
  478. .as_object()
  479. .ok_or_else(|| StreamError::ParseError("Price filter not found".to_string()))?;
  480. let min_ticksize = price_filter["tickSize"]
  481. .as_str()
  482. .ok_or_else(|| StreamError::ParseError("Tick size not found".to_string()))?
  483. .parse::<f32>()
  484. .map_err(|_| StreamError::ParseError("Failed to parse tick size".to_string()))?;
  485. let ticker = Ticker::new(symbol, market_type);
  486. ticker_info_map.insert(ticker, Some(TickerInfo { min_ticksize, ticker }));
  487. }
  488. Ok(ticker_info_map)
  489. }
  490. pub async fn fetch_ticker_prices(market_type: MarketType) -> Result<HashMap<Ticker, TickerStats>, StreamError> {
  491. let market = match market_type {
  492. MarketType::Spot => "spot",
  493. MarketType::LinearPerps => "linear",
  494. };
  495. let url = format!("https://api.bybit.com/v5/market/tickers?category={market}");
  496. let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  497. let text = response.text().await.map_err(StreamError::FetchError)?;
  498. let exchange_info: Value =
  499. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  500. let result_list: &Vec<Value> = exchange_info["result"]["list"]
  501. .as_array()
  502. .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
  503. let mut ticker_prices_map = HashMap::new();
  504. let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
  505. for item in result_list {
  506. let symbol = item["symbol"]
  507. .as_str()
  508. .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
  509. if !re.is_match(symbol) {
  510. continue;
  511. }
  512. let mark_price = item["lastPrice"]
  513. .as_str()
  514. .ok_or_else(|| StreamError::ParseError("Mark price not found".to_string()))?
  515. .parse::<f32>()
  516. .map_err(|_| StreamError::ParseError("Failed to parse mark price".to_string()))?;
  517. let daily_price_chg = item["price24hPcnt"]
  518. .as_str()
  519. .ok_or_else(|| StreamError::ParseError("Daily price change not found".to_string()))?
  520. .parse::<f32>()
  521. .map_err(|_| {
  522. StreamError::ParseError("Failed to parse daily price change".to_string())
  523. })?;
  524. let daily_volume = item["volume24h"]
  525. .as_str()
  526. .ok_or_else(|| StreamError::ParseError("Daily volume not found".to_string()))?
  527. .parse::<f32>()
  528. .map_err(|_| StreamError::ParseError("Failed to parse daily volume".to_string()))?;
  529. let quote_volume = daily_volume * mark_price;
  530. if quote_volume < 4_000_000.0 {
  531. continue;
  532. }
  533. let ticker_stats = TickerStats {
  534. mark_price,
  535. daily_price_chg: daily_price_chg * 100.0,
  536. daily_volume: quote_volume,
  537. };
  538. ticker_prices_map.insert(Ticker::new(symbol, market_type), ticker_stats);
  539. }
  540. Ok(ticker_prices_map)
  541. }