china_futures.rs 21 KB


  1. use tracing::{info, warn};
  2. use crate::data_providers::{deserialize_string_to_f32, SpawnExecutor};
  3. use crate::data_providers::deserialize_string_to_i64;
  4. use std::collections::HashMap;
  5. use std::vec;
  6. use bytes::Bytes;
  7. use iced::{
  8. stream,
  9. futures::{sink::SinkExt, Stream},
  10. };
  11. use regex::Regex;
  12. use serde_json::json;
  13. use serde_json::Value;
  14. use sonic_rs::{Deserialize, JsonValueTrait, Serialize};
  15. use sonic_rs::to_object_iter_unchecked;
  16. use fastwebsockets::{Frame, FragmentCollector, OpCode};
  17. use http_body_util::Empty;
  18. use hyper::header::{CONNECTION, UPGRADE};
  19. use hyper::Request;
  20. use hyper::upgrade::Upgraded;
  21. use hyper_util::rt::TokioIo;
  22. use tokio::net::TcpStream;
  23. use crate::data_providers::{
  24. Connection, Event, Kline, LocalDepthCache, MarketType, Order, State,
  25. StreamError, TickerInfo, TickerStats, Trade, VecLocalDepthCache,
  26. };
  27. use crate::{Ticker, Timeframe};
  28. use super::str_f32_parse;
  29. use super::OpenInterest;
  30. use tracing::{error};
  31. #[derive(Serialize, Deserialize, Debug)]
  32. struct SonicDepth {
  33. #[serde(rename = "time")]
  34. pub update_id: u64,
  35. #[serde(rename = "bids")]
  36. pub bids: Vec<BidAsk>,
  37. #[serde(rename = "asks")]
  38. pub asks: Vec<BidAsk>,
  39. }
  40. #[derive(Serialize, Deserialize, Debug)]
  41. struct BidAsk {
  42. #[serde(rename = "0")]
  43. pub price: String,
  44. #[serde(rename = "1")]
  45. pub qty: String,
  46. }
  47. #[derive(Serialize, Deserialize, Debug)]
  48. struct SonicTrade {
  49. #[serde(rename = "time")]
  50. pub time: u64,
  51. #[serde(rename = "last_price")]
  52. pub price: String,
  53. #[serde(rename = "last_qty")]
  54. pub qty: String,
  55. #[serde(rename = "side")]
  56. pub side: String,
  57. }
  58. #[derive(Debug)]
  59. enum StreamData {
  60. Trade(Vec<SonicTrade>),
  61. Depth(SonicDepth, i64),
  62. // Kline(Ticker, Vec<SonicKline>),
  63. }
  64. #[allow(unused_assignments)]
  65. fn feed_de(
  66. slice: &[u8]
  67. ) -> Result<Vec<StreamData>, StreamError> {
  68. // // 这里应该是做缓存,之前的bybit数据推送是增量的
  69. // let mut depth_wrap: Option<SonicDepth> = None;
  70. let iter: sonic_rs::ObjectJsonIter = unsafe { to_object_iter_unchecked(slice) };
  71. let mut trade = SonicTrade {
  72. time: 0,
  73. price: "".to_string(),
  74. qty: "".to_string(),
  75. side: "".to_string(),
  76. };
  77. let mut depth = SonicDepth {
  78. update_id: 0,
  79. bids: vec![],
  80. asks: vec![],
  81. };
  82. for elem in iter {
  83. let (k, v) = elem.map_err(|e| StreamError::ParseError(e.to_string()))?;
  84. // let v = &v.as_raw_faststr();
  85. match k.as_str() {
  86. "time" => {
  87. let t: u64 = v.as_u64().unwrap();
  88. trade.time = t;
  89. depth.update_id = t;
  90. }
  91. "last_price" => {
  92. trade.price = v.to_string();
  93. }
  94. "last_qty" => {
  95. trade.qty = v.to_string();
  96. }
  97. "side" => {
  98. trade.side = v.to_string();
  99. }
  100. "asks" => {
  101. let asks = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
  102. .unwrap();
  103. for ask in asks.as_array().unwrap() {
  104. let order_book = BidAsk {
  105. price: ask.as_array().unwrap()[0].to_string(),
  106. qty: ask.as_array().unwrap()[1].to_string(),
  107. };
  108. depth.asks.push(order_book);
  109. }
  110. }
  111. "bids" => {
  112. let bids = serde_json::from_slice::<Value>(v.as_raw_str().as_bytes())
  113. .unwrap();
  114. for bid in bids.as_array().unwrap() {
  115. let order_book = BidAsk {
  116. price: bid.as_array().unwrap()[0].to_string(),
  117. qty: bid.as_array().unwrap()[1].to_string(),
  118. };
  119. depth.bids.push(order_book);
  120. }
  121. }
  122. &_ => {}
  123. }
  124. }
  125. let update_id = depth.update_id;
  126. Ok(vec![StreamData::Trade(vec![trade]), StreamData::Depth(depth, update_id as i64)])
  127. }
  128. async fn connect() -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
  129. let url = "ws://localhost:6789";
  130. let addr = "localhost:6789";
  131. let stream = TcpStream::connect(&addr).await
  132. .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
  133. // 2. 构建 WebSocket 握手请求
  134. let req = Request::builder()
  135. .method("GET")
  136. .uri(url)
  137. .header("Host", "localhost")
  138. .header(UPGRADE, "websocket")
  139. .header(CONNECTION, "upgrade")
  140. .header("Sec-WebSocket-Key", fastwebsockets::handshake::generate_key())
  141. .header("Sec-WebSocket-Version", "13")
  142. .header("Sec-WebSocket-Protocol", "rust-websocket") // 可选协议
  143. .header("User-Agent", "rust-client/1.0") // 添加 UA 头
  144. .body(Empty::<Bytes>::new())
  145. .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
  146. let (ws, _) = fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
  147. .await
  148. .map_err(|e| StreamError::WebsocketError(e.to_string()))?;
  149. Ok(FragmentCollector::new(ws))
  150. }
  151. async fn try_connect(
  152. streams: &Value,
  153. output: &mut futures::channel::mpsc::Sender<Event>,
  154. ) -> State {
  155. match connect().await {
  156. Ok(mut websocket) => {
  157. if let Err(e) = websocket
  158. .write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
  159. streams.to_string().as_bytes(),
  160. )))
  161. .await
  162. {
  163. let _ = output
  164. .send(Event::Disconnected(format!("Failed subscribing: {e}")))
  165. .await;
  166. return State::Disconnected;
  167. }
  168. let _ = output.send(Event::Connected(Connection)).await;
  169. State::Connected(websocket)
  170. }
  171. Err(err) => {
  172. tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
  173. let _ = output
  174. .send(Event::Disconnected(format!("Failed to connect: {err}")))
  175. .await;
  176. State::Disconnected
  177. }
  178. }
  179. }
  180. pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
  181. stream::channel(100, move |mut output| async move {
  182. let mut state: State = State::Disconnected;
  183. let (symbol_str, market_type) = ticker.get_string();
  184. let stream_1 = format!("publicTrade.{symbol_str}");
  185. let stream_2 = format!(
  186. "orderbook.{}.{}",
  187. match market_type {
  188. MarketType::Spot => "200",
  189. MarketType::LinearPerps => "500",
  190. },
  191. symbol_str,
  192. );
  193. let subscribe_message = json!({
  194. "op": "subscribe",
  195. "args": [stream_1, stream_2]
  196. });
  197. let mut trades_buffer: Vec<Trade> = Vec::new();
  198. let mut orderbook = LocalDepthCache::new();
  199. loop {
  200. match &mut state {
  201. State::Disconnected => {
  202. state = try_connect(
  203. &subscribe_message,
  204. &mut output
  205. ).await;
  206. }
  207. State::Connected(websocket) => match websocket.read_frame().await {
  208. Ok(msg) => match msg.opcode {
  209. OpCode::Text => {
  210. let result = feed_de(&msg.payload[..]);
  211. match result {
  212. Ok(data_vec) => {
  213. let trade_handle_rst = &data_vec[0];
  214. let depth_handle_rst = &data_vec[1];
  215. match trade_handle_rst {
  216. StreamData::Trade(de_trade_vec) => {
  217. for de_trade in de_trade_vec {
  218. let trade = Trade {
  219. time: de_trade.time as i64,
  220. is_sell: de_trade.side == "sell",
  221. price: str_f32_parse(&de_trade.price),
  222. qty: str_f32_parse(&de_trade.qty),
  223. };
  224. trades_buffer.push(trade);
  225. }
  226. }
  227. StreamData::Depth(_, _) => {}
  228. }
  229. match depth_handle_rst {
  230. StreamData::Depth(de_depth, time) => {
  231. let t = time.clone();
  232. let depth_update = VecLocalDepthCache {
  233. last_update_id: de_depth.update_id as i64,
  234. time: t,
  235. bids: de_depth
  236. .bids
  237. .iter()
  238. .map(|x| Order {
  239. price: str_f32_parse(&x.price),
  240. qty: str_f32_parse(&x.qty),
  241. })
  242. .collect(),
  243. asks: de_depth
  244. .asks
  245. .iter()
  246. .map(|x| Order {
  247. price: str_f32_parse(&x.price),
  248. qty: str_f32_parse(&x.qty),
  249. })
  250. .collect(),
  251. };
  252. orderbook.fetched(&depth_update);
  253. }
  254. StreamData::Trade(_) => {}
  255. }
  256. }
  257. Err(e) => {
  258. // 处理错误
  259. error!("处理数据失败: {}", e);
  260. }
  261. }
  262. }
  263. OpCode::Close => {
  264. state = State::Disconnected;
  265. let _ = output
  266. .send(Event::Disconnected("Connection closed".to_string()))
  267. .await;
  268. }
  269. _ => {}
  270. },
  271. Err(e) => {
  272. state = State::Disconnected;
  273. let _ = output
  274. .send(Event::Disconnected(
  275. "Error reading frame: ".to_string() + &e.to_string(),
  276. ))
  277. .await;
  278. }
  279. },
  280. }
  281. }
  282. })
  283. }
  284. // fn string_to_timeframe(interval: &str) -> Option<Timeframe> {
  285. // Timeframe::ALL
  286. // .iter()
  287. // .find(|&tf| tf.to_minutes().to_string() == interval)
  288. // .copied()
  289. // }
  290. #[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
  291. #[serde(rename_all = "camelCase")]
  292. struct DeOpenInterest {
  293. #[serde(rename = "openInterest", deserialize_with = "deserialize_string_to_f32")]
  294. pub value: f32,
  295. #[serde(deserialize_with = "deserialize_string_to_i64")]
  296. pub timestamp: i64,
  297. }
  298. pub async fn fetch_historical_oi(
  299. ticker: Ticker,
  300. range: Option<(i64, i64)>,
  301. period: Timeframe,
  302. ) -> Result<Vec<OpenInterest>, StreamError> {
  303. let ticker_str = ticker.get_string().0.to_uppercase();
  304. let period_str = match period {
  305. Timeframe::M5 => "5min",
  306. Timeframe::M15 => "15min",
  307. Timeframe::M30 => "30min",
  308. Timeframe::H1 => "1h",
  309. Timeframe::H2 => "2h",
  310. Timeframe::H4 => "4h",
  311. _ => {
  312. let err_msg = format!("Unsupported timeframe for open interest: {}", period);
  313. error!("{}", err_msg);
  314. return Err(StreamError::UnknownError(err_msg));
  315. }
  316. };
  317. let mut url = format!(
  318. "https://api.bybit.com/v5/market/open-interest?category=linear&symbol={}&intervalTime={}",
  319. ticker_str, period_str,
  320. );
  321. if let Some((start, end)) = range {
  322. let interval_ms = period.to_milliseconds() as i64;
  323. let num_intervals = ((end - start) / interval_ms).min(200);
  324. if num_intervals > 1 {
  325. url.push_str(&format!(
  326. "&startTime={start}&endTime={end}&limit={num_intervals}"
  327. ));
  328. } else {
  329. url.push_str("&limit=200");
  330. }
  331. } else {
  332. url.push_str("&limit=200");
  333. }
  334. let response = reqwest::get(&url)
  335. .await
  336. .map_err(|e| {
  337. error!("Failed to fetch from {}: {}", url, e);
  338. StreamError::FetchError(e)
  339. })?;
  340. let text = response.text()
  341. .await
  342. .map_err(|e| {
  343. error!("Failed to get response text from {}: {}", url, e);
  344. StreamError::FetchError(e)
  345. })?;
  346. let content: Value = sonic_rs::from_str(&text)
  347. .map_err(|e| {
  348. error!("Failed to parse JSON from {}: {}\nResponse: {}", url, e, text);
  349. StreamError::ParseError(e.to_string())
  350. })?;
  351. let result_list = content["result"]["list"]
  352. .as_array()
  353. .ok_or_else(|| {
  354. error!("Result list is not an array in response: {}", text);
  355. StreamError::ParseError("Result list is not an array".to_string())
  356. })?;
  357. let bybit_oi: Vec<DeOpenInterest> = serde_json::from_value(json!(result_list))
  358. .map_err(|e| {
  359. error!("Failed to parse open interest array: {}\nResponse: {}", e, text);
  360. StreamError::ParseError(format!("Failed to parse open interest: {e}"))
  361. })?;
  362. let open_interest: Vec<OpenInterest> = bybit_oi
  363. .into_iter()
  364. .map(|x| OpenInterest {
  365. time: x.timestamp,
  366. value: x.value,
  367. })
  368. .collect();
  369. if open_interest.is_empty() {
  370. warn!("No open interest data found for {}, from url: {}", ticker_str, url);
  371. }
  372. Ok(open_interest)
  373. }
  374. #[allow(dead_code)]
  375. #[derive(Deserialize, Debug)]
  376. struct ApiResponse {
  377. #[serde(rename = "retCode")]
  378. ret_code: u32,
  379. #[serde(rename = "retMsg")]
  380. ret_msg: String,
  381. result: ApiResult,
  382. }
  383. #[allow(dead_code)]
  384. #[derive(Deserialize, Debug)]
  385. struct ApiResult {
  386. symbol: String,
  387. category: String,
  388. list: Vec<Vec<Value>>,
  389. }
  390. pub async fn fetch_klines(
  391. ticker: Ticker,
  392. timeframe: Timeframe,
  393. range: Option<(i64, i64)>,
  394. ) -> Result<Vec<Kline>, StreamError> {
  395. let (symbol_str, market_type) = &ticker.get_string();
  396. let timeframe_str = timeframe.to_minutes().to_string();
  397. fn parse_kline_field<T: std::str::FromStr>(field: Option<&str>) -> Result<T, StreamError> {
  398. field
  399. .ok_or_else(|| StreamError::ParseError("Failed to parse kline".to_string()))
  400. .and_then(|s| {
  401. s.parse::<T>()
  402. .map_err(|_| StreamError::ParseError("Failed to parse kline".to_string()))
  403. })
  404. }
  405. let market = match market_type {
  406. MarketType::Spot => "spot",
  407. MarketType::LinearPerps => "linear",
  408. };
  409. let mut url = format!(
  410. "https://api.bybit.com/v5/market/kline?category={}&symbol={}&interval={}",
  411. market, symbol_str.to_uppercase(), timeframe_str
  412. );
  413. if let Some((start, end)) = range {
  414. let interval_ms = timeframe.to_milliseconds() as i64;
  415. let num_intervals = ((end - start) / interval_ms).min(1000);
  416. url.push_str(&format!("&start={start}&end={end}&limit={num_intervals}"));
  417. } else {
  418. url.push_str(&format!("&limit={}", 200));
  419. }
  420. let response: reqwest::Response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  421. let text = response.text().await.map_err(StreamError::FetchError)?;
  422. let api_response: ApiResponse =
  423. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  424. let klines: Result<Vec<Kline>, StreamError> = api_response
  425. .result
  426. .list
  427. .iter()
  428. .map(|kline| {
  429. let time = parse_kline_field::<u64>(kline[0].as_str())?;
  430. let open = parse_kline_field::<f32>(kline[1].as_str())?;
  431. let high = parse_kline_field::<f32>(kline[2].as_str())?;
  432. let low = parse_kline_field::<f32>(kline[3].as_str())?;
  433. let close = parse_kline_field::<f32>(kline[4].as_str())?;
  434. let volume = parse_kline_field::<f32>(kline[5].as_str())?;
  435. Ok(Kline {
  436. time,
  437. open,
  438. high,
  439. low,
  440. close,
  441. volume: (-1.0, volume),
  442. })
  443. })
  444. .collect();
  445. klines
  446. }
  447. pub async fn fetch_ticksize(market_type: MarketType) -> Result<HashMap<Ticker, Option<TickerInfo>>, StreamError> {
  448. let market = match market_type {
  449. MarketType::Spot => "spot",
  450. MarketType::LinearPerps => "linear",
  451. };
  452. let url = format!("https://api.bybit.com/v5/market/instruments-info?category={market}");
  453. let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  454. let text = response.text().await.map_err(StreamError::FetchError)?;
  455. let exchange_info: Value =
  456. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  457. let result_list: &Vec<Value> = exchange_info["result"]["list"]
  458. .as_array()
  459. .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
  460. let mut ticker_info_map = HashMap::new();
  461. let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
  462. for item in result_list {
  463. let symbol = item["symbol"]
  464. .as_str()
  465. .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
  466. if !re.is_match(symbol) {
  467. continue;
  468. }
  469. let price_filter = item["priceFilter"]
  470. .as_object()
  471. .ok_or_else(|| StreamError::ParseError("Price filter not found".to_string()))?;
  472. let min_ticksize = price_filter["tickSize"]
  473. .as_str()
  474. .ok_or_else(|| StreamError::ParseError("Tick size not found".to_string()))?
  475. .parse::<f32>()
  476. .map_err(|_| StreamError::ParseError("Failed to parse tick size".to_string()))?;
  477. let ticker = Ticker::new(symbol, market_type);
  478. ticker_info_map.insert(ticker, Some(TickerInfo { min_ticksize, ticker }));
  479. }
  480. Ok(ticker_info_map)
  481. }
  482. pub async fn fetch_ticker_prices(market_type: MarketType) -> Result<HashMap<Ticker, TickerStats>, StreamError> {
  483. let market = match market_type {
  484. MarketType::Spot => "spot",
  485. MarketType::LinearPerps => "linear",
  486. };
  487. let url = format!("https://api.bybit.com/v5/market/tickers?category={market}");
  488. let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
  489. let text = response.text().await.map_err(StreamError::FetchError)?;
  490. let exchange_info: Value =
  491. sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
  492. let result_list: &Vec<Value> = exchange_info["result"]["list"]
  493. .as_array()
  494. .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
  495. let mut ticker_prices_map = HashMap::new();
  496. let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
  497. for item in result_list {
  498. let symbol = item["symbol"]
  499. .as_str()
  500. .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
  501. if !re.is_match(symbol) {
  502. continue;
  503. }
  504. let mark_price = item["lastPrice"]
  505. .as_str()
  506. .ok_or_else(|| StreamError::ParseError("Mark price not found".to_string()))?
  507. .parse::<f32>()
  508. .map_err(|_| StreamError::ParseError("Failed to parse mark price".to_string()))?;
  509. let daily_price_chg = item["price24hPcnt"]
  510. .as_str()
  511. .ok_or_else(|| StreamError::ParseError("Daily price change not found".to_string()))?
  512. .parse::<f32>()
  513. .map_err(|_| {
  514. StreamError::ParseError("Failed to parse daily price change".to_string())
  515. })?;
  516. let daily_volume = item["volume24h"]
  517. .as_str()
  518. .ok_or_else(|| StreamError::ParseError("Daily volume not found".to_string()))?
  519. .parse::<f32>()
  520. .map_err(|_| StreamError::ParseError("Failed to parse daily volume".to_string()))?;
  521. let quote_volume = daily_volume * mark_price;
  522. if quote_volume < 4_000_000.0 {
  523. continue;
  524. }
  525. let ticker_stats = TickerStats {
  526. mark_price,
  527. daily_price_chg: daily_price_chg * 100.0,
  528. daily_volume: quote_volume,
  529. };
  530. ticker_prices_map.insert(Ticker::new(symbol, market_type), ticker_stats);
  531. }
  532. Ok(ticker_prices_map)
  533. }