|
@@ -9,7 +9,6 @@ use iced::{
|
|
|
futures::{sink::SinkExt, Stream},
|
|
futures::{sink::SinkExt, Stream},
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-use regex::Regex;
|
|
|
|
|
use serde_json::json;
|
|
use serde_json::json;
|
|
|
use serde_json::Value;
|
|
use serde_json::Value;
|
|
|
|
|
|
|
@@ -33,6 +32,7 @@ use super::str_f32_parse;
|
|
|
use super::OpenInterest;
|
|
use super::OpenInterest;
|
|
|
|
|
|
|
|
use tracing::error;
|
|
use tracing::error;
|
|
|
|
|
+use std::path::Path;
|
|
|
|
|
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
|
struct SonicDepth {
|
|
struct SonicDepth {
|
|
@@ -144,9 +144,9 @@ fn feed_de(
|
|
|
Ok(vec![StreamData::Trade(vec![trade]), StreamData::Depth(depth, update_id as i64)])
|
|
Ok(vec![StreamData::Trade(vec![trade]), StreamData::Depth(depth, update_id as i64)])
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-async fn connect() -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
|
|
|
|
|
- let url = "ws://localhost:6789";
|
|
|
|
|
- let addr = "localhost:6789";
|
|
|
|
|
|
|
+async fn connect(port: String) -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError> {
|
|
|
|
|
+ let url = format!("ws://localhost:{port}");
|
|
|
|
|
+ let addr = format!("localhost:{port}");
|
|
|
let stream = TcpStream::connect(&addr).await
|
|
let stream = TcpStream::connect(&addr).await
|
|
|
.map_err(|e| StreamError::WebsocketError(e.to_string()))?;
|
|
.map_err(|e| StreamError::WebsocketError(e.to_string()))?;
|
|
|
|
|
|
|
@@ -174,8 +174,9 @@ async fn connect() -> Result<FragmentCollector<TokioIo<Upgraded>>, StreamError>
|
|
|
async fn try_connect(
|
|
async fn try_connect(
|
|
|
streams: &Value,
|
|
streams: &Value,
|
|
|
output: &mut futures::channel::mpsc::Sender<Event>,
|
|
output: &mut futures::channel::mpsc::Sender<Event>,
|
|
|
|
|
+ port: String
|
|
|
) -> State {
|
|
) -> State {
|
|
|
- match connect().await {
|
|
|
|
|
|
|
+ match connect(port).await {
|
|
|
Ok(mut websocket) => {
|
|
Ok(mut websocket) => {
|
|
|
if let Err(e) = websocket
|
|
if let Err(e) = websocket
|
|
|
.write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
|
|
.write_frame(Frame::text(fastwebsockets::Payload::Borrowed(
|
|
@@ -208,6 +209,10 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
|
|
|
let mut state: State = State::Disconnected;
|
|
let mut state: State = State::Disconnected;
|
|
|
|
|
|
|
|
let (symbol_str, market_type) = ticker.get_string();
|
|
let (symbol_str, market_type) = ticker.get_string();
|
|
|
|
|
+ // 方法1:使用 split_once 更安全
|
|
|
|
|
+ let port = symbol_str.split_once('_')
|
|
|
|
|
+ .map(|(_, port)| port.to_string())
|
|
|
|
|
+ .unwrap(); // 提供默认值
|
|
|
|
|
|
|
|
let stream_1 = format!("publicTrade.{symbol_str}");
|
|
let stream_1 = format!("publicTrade.{symbol_str}");
|
|
|
let stream_2 = format!(
|
|
let stream_2 = format!(
|
|
@@ -232,7 +237,8 @@ pub fn connect_market_stream(ticker: Ticker) -> impl Stream<Item = Event> {
|
|
|
State::Disconnected => {
|
|
State::Disconnected => {
|
|
|
state = try_connect(
|
|
state = try_connect(
|
|
|
&subscribe_message,
|
|
&subscribe_message,
|
|
|
- &mut output
|
|
|
|
|
|
|
+ &mut output,
|
|
|
|
|
+ port.clone(),
|
|
|
).await;
|
|
).await;
|
|
|
}
|
|
}
|
|
|
State::Connected(websocket) => match websocket.read_frame().await {
|
|
State::Connected(websocket) => match websocket.read_frame().await {
|
|
@@ -520,48 +526,20 @@ pub async fn fetch_klines(
|
|
|
klines
|
|
klines
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// 此方法获取ticker信息,以及全市场的pair信息,
|
|
|
pub async fn fetch_ticksize(market_type: MarketType) -> Result<HashMap<Ticker, Option<TickerInfo>>, StreamError> {
|
|
pub async fn fetch_ticksize(market_type: MarketType) -> Result<HashMap<Ticker, Option<TickerInfo>>, StreamError> {
|
|
|
- let market = match market_type {
|
|
|
|
|
- MarketType::Spot => "spot",
|
|
|
|
|
- MarketType::LinearPerps => "linear",
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
- let url = format!("https://api.bybit.com/v5/market/instruments-info?category={market}");
|
|
|
|
|
- let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
|
|
|
|
|
- let text = response.text().await.map_err(StreamError::FetchError)?;
|
|
|
|
|
-
|
|
|
|
|
- let exchange_info: Value =
|
|
|
|
|
- sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
|
|
|
|
|
-
|
|
|
|
|
- let result_list: &Vec<Value> = exchange_info["result"]["list"]
|
|
|
|
|
- .as_array()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
|
|
|
|
|
|
|
+ let path = Path::new("./ticker_info.json");
|
|
|
|
|
+ let content = std::fs::read_to_string(path)
|
|
|
|
|
+ .map_err(|e| StreamError::ParseError(format!("Failed to read file: {}", e)))?;
|
|
|
|
|
+ let ticker_json: Value = serde_json::from_str(&content)
|
|
|
|
|
+ .map_err(|e| StreamError::ParseError(format!("Failed to parse JSON: {}", e)))?;
|
|
|
|
|
|
|
|
let mut ticker_info_map = HashMap::new();
|
|
let mut ticker_info_map = HashMap::new();
|
|
|
-
|
|
|
|
|
- let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
|
|
|
|
|
-
|
|
|
|
|
- for item in result_list {
|
|
|
|
|
- let symbol = item["symbol"]
|
|
|
|
|
- .as_str()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
|
|
|
|
|
-
|
|
|
|
|
- if !re.is_match(symbol) {
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- let price_filter = item["priceFilter"]
|
|
|
|
|
- .as_object()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Price filter not found".to_string()))?;
|
|
|
|
|
-
|
|
|
|
|
- let min_ticksize = price_filter["tickSize"]
|
|
|
|
|
- .as_str()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Tick size not found".to_string()))?
|
|
|
|
|
- .parse::<f32>()
|
|
|
|
|
- .map_err(|_| StreamError::ParseError("Failed to parse tick size".to_string()))?;
|
|
|
|
|
|
|
+ for ticker in ticker_json.as_array().unwrap() {
|
|
|
|
|
+ let symbol = ticker["symbol"].as_str().unwrap();
|
|
|
|
|
+ let min_ticksize = ticker["tick_size"].as_f64().unwrap() as f32;
|
|
|
|
|
|
|
|
let ticker = Ticker::new(symbol, market_type);
|
|
let ticker = Ticker::new(symbol, market_type);
|
|
|
-
|
|
|
|
|
ticker_info_map.insert(ticker, Some(TickerInfo { min_ticksize, ticker }));
|
|
ticker_info_map.insert(ticker, Some(TickerInfo { min_ticksize, ticker }));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -569,61 +547,21 @@ pub async fn fetch_ticksize(market_type: MarketType) -> Result<HashMap<Ticker, O
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pub async fn fetch_ticker_prices(market_type: MarketType) -> Result<HashMap<Ticker, TickerStats>, StreamError> {
|
|
pub async fn fetch_ticker_prices(market_type: MarketType) -> Result<HashMap<Ticker, TickerStats>, StreamError> {
|
|
|
- let market = match market_type {
|
|
|
|
|
- MarketType::Spot => "spot",
|
|
|
|
|
- MarketType::LinearPerps => "linear",
|
|
|
|
|
- };
|
|
|
|
|
|
|
+ let path = Path::new("./ticker_info.json");
|
|
|
|
|
+ let content = std::fs::read_to_string(path)
|
|
|
|
|
+ .map_err(|e| StreamError::ParseError(format!("Failed to read file: {}", e)))?;
|
|
|
|
|
+ let ticker_json: Value = serde_json::from_str(&content)
|
|
|
|
|
+ .map_err(|e| StreamError::ParseError(format!("Failed to parse JSON: {}", e)))?;
|
|
|
|
|
|
|
|
- let url = format!("https://api.bybit.com/v5/market/tickers?category={market}");
|
|
|
|
|
- let response = reqwest::get(&url).await.map_err(StreamError::FetchError)?;
|
|
|
|
|
- let text = response.text().await.map_err(StreamError::FetchError)?;
|
|
|
|
|
-
|
|
|
|
|
- let exchange_info: Value =
|
|
|
|
|
- sonic_rs::from_str(&text).map_err(|e| StreamError::ParseError(e.to_string()))?;
|
|
|
|
|
-
|
|
|
|
|
- let result_list: &Vec<Value> = exchange_info["result"]["list"]
|
|
|
|
|
- .as_array()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Result list is not an array".to_string()))?;
|
|
|
|
|
|
|
|
|
|
let mut ticker_prices_map = HashMap::new();
|
|
let mut ticker_prices_map = HashMap::new();
|
|
|
-
|
|
|
|
|
- let re = Regex::new(r"^[a-zA-Z0-9]+$").unwrap();
|
|
|
|
|
-
|
|
|
|
|
- for item in result_list {
|
|
|
|
|
- let symbol = item["symbol"]
|
|
|
|
|
- .as_str()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Symbol not found".to_string()))?;
|
|
|
|
|
-
|
|
|
|
|
- if !re.is_match(symbol) {
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- let mark_price = item["lastPrice"]
|
|
|
|
|
- .as_str()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Mark price not found".to_string()))?
|
|
|
|
|
- .parse::<f32>()
|
|
|
|
|
- .map_err(|_| StreamError::ParseError("Failed to parse mark price".to_string()))?;
|
|
|
|
|
-
|
|
|
|
|
- let daily_price_chg = item["price24hPcnt"]
|
|
|
|
|
- .as_str()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Daily price change not found".to_string()))?
|
|
|
|
|
- .parse::<f32>()
|
|
|
|
|
- .map_err(|_| {
|
|
|
|
|
- StreamError::ParseError("Failed to parse daily price change".to_string())
|
|
|
|
|
- })?;
|
|
|
|
|
-
|
|
|
|
|
- let daily_volume = item["volume24h"]
|
|
|
|
|
- .as_str()
|
|
|
|
|
- .ok_or_else(|| StreamError::ParseError("Daily volume not found".to_string()))?
|
|
|
|
|
- .parse::<f32>()
|
|
|
|
|
- .map_err(|_| StreamError::ParseError("Failed to parse daily volume".to_string()))?;
|
|
|
|
|
-
|
|
|
|
|
|
|
+ for ticker in ticker_json.as_array().unwrap() {
|
|
|
|
|
+ let symbol = ticker["symbol"].as_str().unwrap();
|
|
|
|
|
+ let mark_price = 0f32;
|
|
|
|
|
+ let daily_price_chg = 0f32;
|
|
|
|
|
+ let daily_volume = 0f32;
|
|
|
let quote_volume = daily_volume * mark_price;
|
|
let quote_volume = daily_volume * mark_price;
|
|
|
|
|
|
|
|
- if quote_volume < 4_000_000.0 {
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
let ticker_stats = TickerStats {
|
|
let ticker_stats = TickerStats {
|
|
|
mark_price,
|
|
mark_price,
|
|
|
daily_price_chg: daily_price_chg * 100.0,
|
|
daily_price_chg: daily_price_chg * 100.0,
|