||
- use exchanges::gate_swap_rest::GateSwapRest;
- use std::collections::BTreeMap;
- use tokio::io::{AsyncReadExt};
- use exchanges::kucoin_swap_rest::KucoinSwapRest;
- use exchanges::kucoin_swap_ws::{KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
- use exchanges::{proxy};
- use exchanges::okx_swap_ws::{OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
- use std::io::{Read, Write};
- use std::sync::Arc;
- use std::sync::atomic::AtomicBool;
- use tokio::sync::mpsc::channel;
- use tokio::try_join;
- use tracing::{trace};
- use tracing::instrument::WithSubscriber;
- use exchanges::binance_swap_rest::BinanceSwapRest;
- use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType};
- use exchanges::okx_swap_rest::OkxSwapRest;
- #[tokio::test]
- async fn test_import() {
- global::log_utils::init_log_with_trace();
- /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/
- if proxy::ParsingDetail::http_enable_proxy() {
- trace!("检测有代理配置,配置走代理");
- }
- //获取代理
- // demo_get_http_proxy();
- //币安---深度socket-公共频道订阅
- // demo_pub_ws_ba().await;
- // 币安-rest-获取账户信息
- // demo_rest_ba().await;
- //本次更新成功
- //gate-rest -账户信息
- // demo_rest_gate().await;
- //gate-ws-public-private频道
- // demo_ws_gate().await;
- //kucoin_rest -账户信息
- // demo_rest_kucoin().await;
- //Kucoin-ws--公共频道
- // demo_ws_kucoin_pu().await;
- //Kucoin-ws--私有频道
- // demo_ws_kucoin_pr().await;
- //okx - Business 频道
- // demo_ws_okx_bu().await;
- //okx - public 频道
- // demo_ws_okx_pu().await;
- //okx - rest 频道
- // demo_okx_rest().await;
- // demo_so();
- // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
- // KucoinWsType::Private, tx).await;
- // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
- //
- // let t1 = tokio::spawn(async move {
- // ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
- // });
- //
- // let t2 = tokio::spawn(async move {
- // let mut stdout = std::io::stdout();
- // loop {
- // if let Ok(received) = rx.try_recv() {
- // writeln!(stdout, "age: {:?}", received).unwrap();
- // }
- // }
- // let t01 = tokio::spawn(async move {
- // loop {
- // tokio::time::sleep(Duration::from_secs(2)).await;
- // trace!( "发送-指令: ");
- // }
- // });
- // let t02 = tokio::spawn(async move {
- // loop {
- // tokio::time::sleep(Duration::from_secs(3)).await;
- // trace!( "接收 -指令: ");
- // }
- // });
- // try_join!(t01,t02).unwrap();
- // let (mut tx_end, mut rx_end) = channel::<String>(1024);
- // let (mut tx_read, mut rx_read) = channel::<String>(1024);
- // let mut stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await.unwrap();
- // stream.write_all( b"Hello, server!").await.unwrap();
- // stream.flush().await.unwrap();
- // let mutex_stream = Arc::new(Mutex::new(stream));
- //
- // //捕捉用户的主动发送的订阅指令
- // let stream_clone = Arc::clone(&mutex_stream);
- // let t_1 = tokio::spawn(async move {
- // loop {
- // tokio::time::sleep(Duration::from_secs(1)).await;
- // if let Ok(received) = rx_end.try_recv() {
- // trace!("动态订阅内容: {:?}", received);
- // let mut stream_lock = stream_clone.lock().await;
- // // stream_lock.write_all( b"1111!").await.unwrap();
- // // stream_lock.flush().await.unwrap();
- //
- // // stream_lock.write_all(b"Hello, server!").await.unwrap();
- // }
- // }
- // });
- //
- //
- // //socket数据获取,装入回显通道
- // let stream_clone = Arc::clone(&mutex_stream);
- // let t_2 = tokio::spawn(async move {
- // // 创建一个用于存储服务器响应的缓冲区
- // let mut buffer = [0; 512];
- // loop {
- // let mut stream_lock = stream_clone.lock().await;
- // tokio::time::sleep(Duration::from_secs(1)).await;
- //
- // let _ = match stream_lock.read(&mut buffer).await {
- // Ok(n) => {
- // tokio::time::sleep(Duration::from_secs(1)).await;
- // if n == 0 {
- // // 没有读取到数据
- // trace!("没有数据,主动发送");
- // } else {
- // // 打印服务器的响应
- // trace!("有数据: {}", String::from_utf8_lossy(&buffer[..n]));
- // tx_read.send(format!("{}", String::from_utf8_lossy(&buffer[..n]))).await.unwrap()
- // }
- // }
- // Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- // // 如果读取操作会阻塞,则等待一会儿再试
- // trace!("Would block, sleeping会阻碍睡眠??");
- // tokio::time::sleep(Duration::from_secs(3)).await;
- // continue;
- // }
- // Err(e) => {
- // trace!("Err:{:?}",e);
- // break;
- // }
- // };
- // }
- // });
- //
- //
- // //socket 数据回显
- // let t02 = tokio::spawn(async move {
- // loop {
- // tokio::time::sleep(Duration::from_secs(1)).await;
- // // tx.send("hai!!!".to_string()).await.unwrap();
- // if let Ok(received) = rx_read.try_recv() {
- // trace!("拿到 socket 的数据: {:?}", received);
- // }
- // }
- // });
- //
- // //模拟用户动态发送
- // let t03 = tokio::spawn(async move {
- // loop {
- // tokio::time::sleep(Duration::from_secs(1)).await;
- // tx_end.send("这里是 动态订阅".to_string()).await.unwrap();
- // }
- // });
- //
- // try_join!(t_1,t_2,t02,t03).unwrap();
- }
- async fn demo_okx_rest() {
- // let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
- // btree_map.insert("access_key".to_string(), "".to_string());
- // btree_map.insert("secret_key".to_string(), "".to_string());
- // btree_map.insert("pass_key".to_string(), "".to_string());
- //
- // let mut okx_rest = OkxSwapRest::new(false, btree_map);
- // let res_data = okx_rest.get_account("BTC".to_string()).await;
- // trace!("okx_rest-rest - get_account- {:?}", res_data);
- }
- async fn demo_ws_gate() {}
- fn demo_so() {
- // 代理服务器地址和端口
- // let proxy_address = "127.0.0.1:7890";
- // // 目标服务器地址和端口
- // let target_address = "wss://ws.okx.com:8443/ws/v5/public";
- //
- // // 建立与代理服务器的连接
- // let mut proxy_stream = TcpStream::connect(proxy_address).expect("Failed to connect to proxy");
- //
- // // 发送代理请求
- // let request = format!("CONNECT {}\r\n\r\n", target_address);
- // proxy_stream.write_all(request.as_bytes()).expect("Failed to send proxy request");
- //
- // // 读取代理响应
- // let mut response = String::new();
- // proxy_stream.read_to_string(&mut response).expect("Failed to read proxy response");
- //
- // // 检查代理响应是否成功
- // if !response.contains("200 Connection established") {
- // trace!("Proxy connection failed: {}", response);
- // }
- //
- // // 从代理连接中获取原始 TCP 流
- // let mut tcp_stream = std::mem::ManuallyDrop::new(proxy_stream);
- //
- // // 现在你可以使用 `tcp_stream` 来进行与目标服务器的通信
- // // 例如,发送和接收数据
- // // tcp_stream.write_all(b"Hello, server!").expect("Failed to send data");
- //
- // thread::spawn(move || {
- // loop {
- // let mut buffer = [0u8; 1024]; // 用于存储读取的数据的缓冲区
- // let bytes_read = tcp_stream.read(&mut buffer).expect("Failed to read data");
- //
- // // 将读取的数据转换为字符串并打印
- // if let Ok(data) = std::str::from_utf8(&buffer[..bytes_read]) {
- // trace!("Received data: {}", data);
- // } else {
- // trace!("Received data contains non-UTF8 characters");
- // }
- // }
- //
- // });
- //
- // // 最后记得手动释放套接字
- // // 注意:不要调用 `drop(tcp_stream)`,因为我们使用了 `ManuallyDrop` 来避免自动释放
- // unsafe {
- // std::mem::ManuallyDrop::drop(&mut tcp_stream);
- // }
- }
- async fn demo_ws_okx_pu() {
- // let btree_map: BTreeMap<String, String> = BTreeMap::new();
- // let (tx, mut rx) = channel(1024);
- // let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx);
- // ku_ws.set_subscribe(vec![OkxSubscribeType::PuIndexTickers]);
- // let t1 = tokio::spawn(async move {
- // ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await;
- // });
- // let t2 = tokio::spawn(async move {
- // let mut stdout = std::io::stdout();
- // loop {
- // if let Ok(received) = rx.try_recv() {
- // writeln!(stdout, "age: {:?}", received).unwrap();
- // }
- // }
- // });
- // try_join!(t1,t2).unwrap();
- }
- async fn demo_ws_okx_bu() {
- // let btree_map: BTreeMap<String, String> = BTreeMap::new();
- // let (tx, mut rx) = channel(1024);
- // let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Business, tx);
- // ku_ws.set_subscribe(vec![OkxSubscribeType::BuIndexCandle30m]);
- // let t1 = tokio::spawn(async move {
- // ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await;
- // });
- // let t2 = tokio::spawn(async move {
- // let mut stdout = std::io::stdout();
- // loop {
- // if let Ok(received) = rx.try_recv() {
- // writeln!(stdout, "age: {:?}", received).unwrap();
- // }
- // }
- // });
- // try_join!(t1,t2).unwrap();
- }
- async fn demo_ws_kucoin_pr() {
- // let mut bool_v1 = Arc::new(AtomicBool::new(true));
- // let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
- // btree_map.insert("access_key".to_string(), "".to_string());
- // btree_map.insert("secret_key".to_string(), "".to_string());
- // btree_map.insert("pass_key".to_string(), "".to_string());
- // trace!("----------------------btree_map{:?}", btree_map.clone());
- // let (tx, mut rx) = channel(1024);
- // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(),
- // KucoinWsType::Private, tx).await;
- // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]);
- //
- // let t1 = tokio::spawn(async move {
- // ku_ws.custom_subscribe(bool_v1, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
- // });
- //
- // let t2 = tokio::spawn(async move {
- // loop {
- // if let Ok(received) = rx.try_recv() {
- // trace!( "age: {:?}", received);
- // }
- // }
- // });
- //
- // try_join!(t1,t2).unwrap();
- }
- async fn demo_ws_kucoin_pu() {
- // let mut bool_v1 = Arc::new(AtomicBool::new(true));
- // let btree_map: BTreeMap<String, String> = BTreeMap::new();
- // let (tx, mut rx) = channel(1024);
- // let mut ku_ws = KucoinSwapWs::new(false, btree_map, KucoinWsType::Public, tx).await;
- // ku_ws.set_subscribe(vec![
- // KucoinSubscribeType::PuContractMarketLevel2Depth50,
- // KucoinSubscribeType::PuContractMarkettickerV2,
- // ]);
- //
- // let t1 = tokio::spawn(async move {
- // ku_ws.custom_subscribe(bool_v1, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await;
- // });
- // let t2 = tokio::spawn(async move {
- // loop {
- // if let Ok(received) = rx.try_recv() {
- // trace!("age: {:?}", received);
- // }
- // }
- // });
- //
- // try_join!(t1,t2).unwrap();
- }
- async fn demo_rest_kucoin() {
- let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
- btree_map.insert("access_key".to_string(), "".to_string());
- btree_map.insert("secret_key".to_string(), "".to_string());
- btree_map.insert("pass_key".to_string(), "".to_string());
- let mut kucoin_exc = KucoinSwapRest::new(false, btree_map);
- // let res_data = kucoin_exc.get_server_time().await;
- // trace!("kucoin_exc-rest - get_server_time- {:?}", res_data);
- // trace!("kucoin_exc-rest-get_delays{:?}", kucoin_exc.get_delays() );
- // trace!("kucoin_exc-rest -get_avg_delay{:?}", kucoin_exc.get_avg_delay());
- // let res_data = kucoin_exc.get_account("USDT".to_string()).await;
- // trace!("kucoin_exc-rest - get_account- {:?}", res_data);
- // let res_data = kucoin_exc.get_position("XBT1USDM".to_string()).await;
- // trace!("kucoin_exc-rest - get_position- {:?}", res_data);
- // let res_data = kucoin_exc.get_market_details().await;
- // trace!("kucoin_exc-rest - get_market_details- {:?}", res_data);
- // let res_data = kucoin_exc.get_ticker("ROSEUSDTM".to_string()).await;
- // trace!("kucoin_exc-rest - get_ticker- {:?}", res_data);
- let res_data = kucoin_exc.get_orders("".to_string(),
- "".to_string()).await;
- trace!("kucoin_exc-rest - get_orders- {:?}", res_data);
- // let res_data = kucoin_exc.get_positions("USDT".to_string()).await;
- // trace!("kucoin_exc-rest - get_positions- {:?}", res_data);
- // let res_data = kucoin_exc.get_orders_details("111".to_string(), "".to_string()).await;
- // trace!("kucoin_exc-rest - get_orders_details- {:?}", res_data);
- // let res_data = kucoin_exc.swap_bazaar_order(
- // "cs_202309111808".to_string(),
- // "ROSEUSDTM".to_string(),
- // "pd".to_string(),
- // 1,
- // "10".to_string(),
- // "0.03856".to_string(),
- // "limit".to_string(),
- // ).await;
- // trace!("kucoin_exc-rest - swap_bazaar_order- {:?}
- // let res_data = kucoin_exc.cancel_order("".to_string(), "999999".to_string()).await;
- // trace!("kucoin_exc-rest - cancel_order- {:?}", res_data);
- // let res_data = kucoin_exc.get_public_token().await;
- // trace!("kucoin_exc-rest - get_public_token- {:?}", res_data);
- }
- async fn demo_rest_gate() {
- let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
- btree_map.insert("access_key".to_string(), "".to_string());
- btree_map.insert("secret_key".to_string(), "".to_string());
- let mut gate_exc = GateSwapRest::new(false, btree_map);
- let res_data = gate_exc.get_account("usdt".to_string()).await;
- trace!("gate-rest -账户信息{:?}", res_data);
- trace!("gate-rest -get_delays{:?}", gate_exc.get_delays() );
- trace!("gate-rest -get_avg_delay{:?}", gate_exc.get_avg_delay());
- // let res_data = gate_exc.get_position("usdt".to_string(), "CYBER_USDT".to_string()).await;
- // trace!("gate-rest -持仓信息{:?}", res_data);
- // let res_data = gate_exc.get_ticker("usdt".to_string()).await;
- // trace!("gate-rest -ticker{:?}", res_data);
- // let res_data = gate_exc.get_server_time().await;
- // trace!("gate-rest -get_server_time{:?}", res_data);
- // let res_data = gate_exc.get_user_position("usdt".to_string()).await;
- // trace!("gate-rest -get_server_time{:?}", res_data);
- // let res_data = gate_exc.get_order_details("usdt".to_string(), "11335522".to_string()).await;
- // trace!("gate-rest -get_order_details{:?}", res_data);
- // let res_data = gate_exc.get_orders("usd1t".to_string(), "open".to_string()).await;
- // trace!("gate-rest -get_orders{:?}", res_data);
- // let params = serde_json::json!({
- // "contract":"CYBER_USDT",
- // "size":-1,
- // "price":"0",
- // "tif":"ioc",
- // });
- // let res_data = gate_exc.take_order("usdt".to_string(), params).await;
- // trace!("gate-rest -get_orders{:?}", res_data);
- // let res_data = gate_exc.setting_dual_mode("usdt".to_string(), true).await;
- // trace!("gate-rest -setting_dual_mode{:?}", res_data);
- // let res_data = gate_exc.setting_dual_leverage("usdt".to_string(),
- // "CYBER_USDT".to_string(),
- // "20".to_string(),
- // ).await;
- // trace!("gate-rest -setting_dual_mode{:?}", res_data);
- // let res_data = gate_exc.wallet_transfers("usdt".to_string(),
- // "CYBER_USDT".to_string(),
- // "20".to_string(),
- // ).await;
- // trace!("gate-rest -setting_dual_mode{:?}", res_data);
- // let res_data = gate_exc.cancel_order("usdt".to_string(),
- // "12345".to_string(),
- // ).await;
- // trace!("gate-rest -setting_dual_mode{:?}", res_data);
- // let res_data = gate_exc.cancel_orders("usdt".to_string(),
- // "CYBER_USDT".to_string(),
- // ).await;
- // trace!("gate-rest -cancel_orders{:?}", res_data);
- // let res_data = gate_exc.order(
- // "usdt".to_string(),
- // "long".to_string(),
- // "buy".to_string(),
- // "ROSE_USDT".to_string(),
- // 1,
- // "0.03888".to_string(),
- // "t-my-custom-id-001".to_string(),
- // ).await;
- // trace!("gate-rest -order{:?}", res_data);
- // let res_data = gate_exc.my_trades("usdt".to_string()).await;
- // trace!("gate-rest -my_trades{:?}", res_data);
- let res_data = gate_exc.account_book("usdt".to_string()).await;
- trace!("gate-rest -account_book{:?}", res_data);
- }
- async fn demo_rest_ba() {
- let mut btree_map: BTreeMap<String, String> = BTreeMap::new();
- btree_map.insert("access_key".to_string(), "".to_string());
- btree_map.insert("secret_key".to_string(), "".to_string());
- // let ba_exc = BinanceUsdtSwapRest::new(false, btree_map);
- // let res_data = ba_exc.get_account().await;
- let mut ba_exc = BinanceSwapRest::new(false, btree_map);
- // let res_data = ba_exc.get_server_time().await;
- // trace!("币安-rest - get_server_time{:?}", res_data);
- // let res_data = ba_exc.get_exchange_info().await;
- // trace!("币安-restget_ get_exchange_info{:?}", res_data);
- let res_data = ba_exc.get_account().await;
- trace!("币安-rest-获取账户信息{:?}", res_data);
- // trace!("币安-rest- -get_delays{:?}", ba_exc.get_delays() );
- // trace!("币安-rest--get_avg_delay{:?}", ba_exc.get_avg_delay());
- // let res_data = ba_exc.get_order("BTCUSDT".to_string(), 131, "".to_string()).await;
- // trace!("币安-rest--get_order{:?}", res_data);
- // let res_data = ba_exc.get_open_orders("BTCUSDT".to_string()).await;
- // trace!("币安-rest--get_open_orders{:?}", res_data);
- // let timestamp_start = chrono::Utc::now().timestamp_millis() - 60 * 100 * 1000;
- // let timestamp_end = chrono::Utc::now().timestamp_millis() + 60 * 100 * 1000;
- // let res_data = ba_exc.get_all_orders("BTCUSDT".to_string(), 1000, timestamp_start, timestamp_end).await;
- // trace!("币安-rest--get_all_orders{:?}", res_data);
- // let res_data = ba_exc.get_book_ticker("BTCUSDT".to_string()).await;
- // trace!("币安-rest--get_book_ticker{:?}", res_data);
- // let res_data = ba_exc.get_position_risk("BTCUS1DT".to_string()).await;
- // trace!("币安-rest--get_position_risk{:?}", res_data);
- // let res_data = ba_exc.change_pos_side(true).await;
- // trace!("币安-rest--change_pos_side{:?}", res_data);
- // let res_data = ba_exc.cancel_order("BTCUSDT".to_string(), 3312331, "".to_string()).await;
- // trace!("币安-rest--cancel_order{:?}", res_data);
- }
- async fn demo_pub_ws_ba() {
- // let mut bool_v1 = Arc::new(AtomicBool::new(true));
- // let btree_map: BTreeMap<String, String> = BTreeMap::new();
- // let (tx, mut rx) = channel(1024);
- // let mut binance_ws = BinanceSwapWs::new(false, btree_map, BinanceWsType::PublicAndPrivate, tx);
- // binance_ws.set_subscribe(vec![
- // BinanceSubscribeType::PuAggTrade,
- // BinanceSubscribeType::PuDepth20levels100ms,
- // BinanceSubscribeType::PuBookTicker,
- // ]);
- // let t1 = tokio::spawn(async move {
- // binance_ws.custom_subscribe(bool_v1,vec!["BTCUSDT".to_string()]).await;
- // });
- // let t2 = tokio::spawn(async move {
- // loop {
- // if let Ok(received) = rx.try_recv() {
- // trace!( "age: {:?}", received);
- // }
- // }
- // });
- // try_join!(t1,t2).unwrap();
- }
- fn demo_get_http_proxy() {
- //代理地址
- let parsing_detail = proxy::ParsingDetail::parsing_environment_variables();
- trace!("----代理信息:{:?}", parsing_detail);
- }
- // /*********************web服务*/
- // fn demo_http() {
- // let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
- // for stream in listener.incoming() {
- // let stream = stream.unwrap();
- //
- // handle_connection(TcpStream::try_from(stream).unwrap());
- // }
- // }
- //
- //
- // fn handle_connection(mut stream: TcpStream) {
- // let buf_reader = BufReader::new(&mut stream);
- // let http_request: Vec<_> = buf_reader
- // .lines()
- // .map(|result| result.unwrap())
- // .take_while(|line| !line.is_empty())
- // .collect();
- // trace!("Request: {:#?}", http_request);
- // trace!("Request2: {:#?}", http_request[0]);
- // trace!("Request3: {:#?}", http_request[1]);
- //
- // let (status_line, filename) = if http_request[0] == "GET / HTTP/1.1" {
- // ("HTTP/1.1 200 OK", "hello.html")
- // } else {
- // ("HTTP/1.1 404 NOT FOUND", "404.html")
- // };
- //
- // let status_line = "HTTP/1.1 200 OK";
- // let contents = fs::read_to_string("src/404.html").unwrap();
- // let length = contents.len();
- //
- // let response =
- // format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
- // // let response = "HTTP/1.1 200 OK\r\n\r\nccccc";
- //
- // stream.write_all(response.as_bytes()).unwrap();
- // }
|