use exchanges::gate_swap_rest::GateSwapRest; use std::collections::BTreeMap; use tokio::io::{AsyncReadExt}; use exchanges::kucoin_swap_rest::KucoinSwapRest; use exchanges::kucoin_swap_ws::{KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType}; use exchanges::{proxy}; use exchanges::okx_swap_ws::{OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType}; use std::io::{Read, Write}; use std::sync::Arc; use std::sync::atomic::AtomicBool; use tokio::sync::mpsc::channel; use tokio::try_join; use tracing::{trace}; use tracing::instrument::WithSubscriber; use exchanges::binance_swap_rest::BinanceSwapRest; use exchanges::binance_swap_ws::{BinanceSwapSubscribeType, BinanceSwapWs, BinanceSwapWsType}; use exchanges::okx_swap_rest::OkxSwapRest; #[tokio::test] async fn test_import() { global::log_utils::init_log_with_trace(); /*******走代理:根据环境变量配置来决定,如果配置了走代理,没有配置不走*******/ if proxy::ParsingDetail::http_enable_proxy() { trace!("检测有代理配置,配置走代理"); } //获取代理 // demo_get_http_proxy(); //币安---深度socket-公共频道订阅 // demo_pub_ws_ba().await; // 币安-rest-获取账户信息 // demo_rest_ba().await; //本次更新成功 //gate-rest -账户信息 // demo_rest_gate().await; //gate-ws-public-private频道 // demo_ws_gate().await; //kucoin_rest -账户信息 // demo_rest_kucoin().await; //Kucoin-ws--公共频道 // demo_ws_kucoin_pu().await; //Kucoin-ws--私有频道 // demo_ws_kucoin_pr().await; //okx - Business 频道 // demo_ws_okx_bu().await; //okx - public 频道 // demo_ws_okx_pu().await; //okx - rest 频道 // demo_okx_rest().await; // demo_so(); // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(), // KucoinWsType::Private, tx).await; // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]); // // let t1 = tokio::spawn(async move { // ku_ws.custom_subscribe(vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await; // }); // // let t2 = tokio::spawn(async move { // let mut stdout = std::io::stdout(); // loop { // if let Ok(received) = rx.try_recv() { // writeln!(stdout, "age: {:?}", received).unwrap(); // } // } // let t01 = tokio::spawn(async move { // loop { // tokio::time::sleep(Duration::from_secs(2)).await; // trace!( "发送-指令: "); // } // }); // let t02 = tokio::spawn(async move { // loop { // tokio::time::sleep(Duration::from_secs(3)).await; // trace!( "接收 -指令: "); // } // }); // try_join!(t01,t02).unwrap(); // let (mut tx_end, mut rx_end) = channel::(1024); // let (mut tx_read, mut rx_read) = channel::(1024); // let mut stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await.unwrap(); // stream.write_all( b"Hello, server!").await.unwrap(); // stream.flush().await.unwrap(); // let mutex_stream = Arc::new(Mutex::new(stream)); // // //捕捉用户的主动发送的订阅指令 // let stream_clone = Arc::clone(&mutex_stream); // let t_1 = tokio::spawn(async move { // loop { // tokio::time::sleep(Duration::from_secs(1)).await; // if let Ok(received) = rx_end.try_recv() { // trace!("动态订阅内容: {:?}", received); // let mut stream_lock = stream_clone.lock().await; // // stream_lock.write_all( b"1111!").await.unwrap(); // // stream_lock.flush().await.unwrap(); // // // stream_lock.write_all(b"Hello, server!").await.unwrap(); // } // } // }); // // // //socket数据获取,装入回显通道 // let stream_clone = Arc::clone(&mutex_stream); // let t_2 = tokio::spawn(async move { // // 创建一个用于存储服务器响应的缓冲区 // let mut buffer = [0; 512]; // loop { // let mut stream_lock = stream_clone.lock().await; // tokio::time::sleep(Duration::from_secs(1)).await; // // let _ = match stream_lock.read(&mut buffer).await { // Ok(n) => { // tokio::time::sleep(Duration::from_secs(1)).await; // if n == 0 { // // 没有读取到数据 // trace!("没有数据,主动发送"); // } else { // // 打印服务器的响应 // trace!("有数据: {}", String::from_utf8_lossy(&buffer[..n])); // tx_read.send(format!("{}", String::from_utf8_lossy(&buffer[..n]))).await.unwrap() // } // } // Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // // 如果读取操作会阻塞,则等待一会儿再试 // trace!("Would block, sleeping会阻碍睡眠??"); // tokio::time::sleep(Duration::from_secs(3)).await; // continue; // } // Err(e) => { // trace!("Err:{:?}",e); // break; // } // }; // } // }); // // // //socket 数据回显 // let t02 = tokio::spawn(async move { // loop { // tokio::time::sleep(Duration::from_secs(1)).await; // // tx.send("hai!!!".to_string()).await.unwrap(); // if let Ok(received) = rx_read.try_recv() { // trace!("拿到 socket 的数据: {:?}", received); // } // } // }); // // //模拟用户动态发送 // let t03 = tokio::spawn(async move { // loop { // tokio::time::sleep(Duration::from_secs(1)).await; // tx_end.send("这里是 动态订阅".to_string()).await.unwrap(); // } // }); // // try_join!(t_1,t_2,t02,t03).unwrap(); } async fn demo_okx_rest() { // let mut btree_map: BTreeMap = BTreeMap::new(); // btree_map.insert("access_key".to_string(), "".to_string()); // btree_map.insert("secret_key".to_string(), "".to_string()); // btree_map.insert("pass_key".to_string(), "".to_string()); // // let mut okx_rest = OkxSwapRest::new(false, btree_map); // let res_data = okx_rest.get_account("BTC".to_string()).await; // trace!("okx_rest-rest - get_account- {:?}", res_data); } async fn demo_ws_gate() {} fn demo_so() { // 代理服务器地址和端口 // let proxy_address = "127.0.0.1:7890"; // // 目标服务器地址和端口 // let target_address = "wss://ws.okx.com:8443/ws/v5/public"; // // // 建立与代理服务器的连接 // let mut proxy_stream = TcpStream::connect(proxy_address).expect("Failed to connect to proxy"); // // // 发送代理请求 // let request = format!("CONNECT {}\r\n\r\n", target_address); // proxy_stream.write_all(request.as_bytes()).expect("Failed to send proxy request"); // // // 读取代理响应 // let mut response = String::new(); // proxy_stream.read_to_string(&mut response).expect("Failed to read proxy response"); // // // 检查代理响应是否成功 // if !response.contains("200 Connection established") { // trace!("Proxy connection failed: {}", response); // } // // // 从代理连接中获取原始 TCP 流 // let mut tcp_stream = std::mem::ManuallyDrop::new(proxy_stream); // // // 现在你可以使用 `tcp_stream` 来进行与目标服务器的通信 // // 例如,发送和接收数据 // // tcp_stream.write_all(b"Hello, server!").expect("Failed to send data"); // // thread::spawn(move || { // loop { // let mut buffer = [0u8; 1024]; // 用于存储读取的数据的缓冲区 // let bytes_read = tcp_stream.read(&mut buffer).expect("Failed to read data"); // // // 将读取的数据转换为字符串并打印 // if let Ok(data) = std::str::from_utf8(&buffer[..bytes_read]) { // trace!("Received data: {}", data); // } else { // trace!("Received data contains non-UTF8 characters"); // } // } // // }); // // // 最后记得手动释放套接字 // // 注意:不要调用 `drop(tcp_stream)`,因为我们使用了 `ManuallyDrop` 来避免自动释放 // unsafe { // std::mem::ManuallyDrop::drop(&mut tcp_stream); // } } async fn demo_ws_okx_pu() { // let btree_map: BTreeMap = BTreeMap::new(); // let (tx, mut rx) = channel(1024); // let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Public, tx); // ku_ws.set_subscribe(vec![OkxSubscribeType::PuIndexTickers]); // let t1 = tokio::spawn(async move { // ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await; // }); // let t2 = tokio::spawn(async move { // let mut stdout = std::io::stdout(); // loop { // if let Ok(received) = rx.try_recv() { // writeln!(stdout, "age: {:?}", received).unwrap(); // } // } // }); // try_join!(t1,t2).unwrap(); } async fn demo_ws_okx_bu() { // let btree_map: BTreeMap = BTreeMap::new(); // let (tx, mut rx) = channel(1024); // let mut ku_ws = OkxSwapWs::new(false, btree_map, OkxWsType::Business, tx); // ku_ws.set_subscribe(vec![OkxSubscribeType::BuIndexCandle30m]); // let t1 = tokio::spawn(async move { // ku_ws.custom_subscribe(vec!["BTC-USD".to_string()]).await; // }); // let t2 = tokio::spawn(async move { // let mut stdout = std::io::stdout(); // loop { // if let Ok(received) = rx.try_recv() { // writeln!(stdout, "age: {:?}", received).unwrap(); // } // } // }); // try_join!(t1,t2).unwrap(); } async fn demo_ws_kucoin_pr() { // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true)); // let mut btree_map: BTreeMap = BTreeMap::new(); // btree_map.insert("access_key".to_string(), "".to_string()); // btree_map.insert("secret_key".to_string(), "".to_string()); // btree_map.insert("pass_key".to_string(), "".to_string()); // trace!("----------------------btree_map{:?}", btree_map.clone()); // let (tx, mut rx) = channel(1024); // let mut ku_ws = KucoinSwapWs::new(false, btree_map.clone(), // KucoinWsType::Private, tx).await; // ku_ws.set_subscribe(vec![KucoinSubscribeType::PrContractMarketTradeOrdersSys]); // // let t1 = tokio::spawn(async move { // ku_ws.custom_subscribe(is_shutdown_arc, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await; // }); // // let t2 = tokio::spawn(async move { // loop { // if let Ok(received) = rx.try_recv() { // trace!( "age: {:?}", received); // } // } // }); // // try_join!(t1,t2).unwrap(); } async fn demo_ws_kucoin_pu() { // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true)); // let btree_map: BTreeMap = BTreeMap::new(); // let (tx, mut rx) = channel(1024); // let mut ku_ws = KucoinSwapWs::new(false, btree_map, KucoinWsType::Public, tx).await; // ku_ws.set_subscribe(vec![ // KucoinSubscribeType::PuContractMarketLevel2Depth50, // KucoinSubscribeType::PuContractMarkettickerV2, // ]); // // let t1 = tokio::spawn(async move { // ku_ws.custom_subscribe(is_shutdown_arc, vec!["ACHUSDTM".to_string(), "ROSEUSDTM".to_string()]).await; // }); // let t2 = tokio::spawn(async move { // loop { // if let Ok(received) = rx.try_recv() { // trace!("age: {:?}", received); // } // } // }); // // try_join!(t1,t2).unwrap(); } async fn demo_rest_kucoin() { let mut btree_map: BTreeMap = BTreeMap::new(); btree_map.insert("access_key".to_string(), "".to_string()); btree_map.insert("secret_key".to_string(), "".to_string()); btree_map.insert("pass_key".to_string(), "".to_string()); let mut kucoin_exc = KucoinSwapRest::new(false, btree_map); // let res_data = kucoin_exc.get_server_time().await; // trace!("kucoin_exc-rest - get_server_time- {:?}", res_data); // trace!("kucoin_exc-rest-get_delays{:?}", kucoin_exc.get_delays() ); // trace!("kucoin_exc-rest -get_avg_delay{:?}", kucoin_exc.get_avg_delay()); // let res_data = kucoin_exc.get_account("USDT".to_string()).await; // trace!("kucoin_exc-rest - get_account- {:?}", res_data); // let res_data = kucoin_exc.get_position("XBT1USDM".to_string()).await; // trace!("kucoin_exc-rest - get_position- {:?}", res_data); // let res_data = kucoin_exc.get_market_details().await; // trace!("kucoin_exc-rest - get_market_details- {:?}", res_data); // let res_data = kucoin_exc.get_ticker("ROSEUSDTM".to_string()).await; // trace!("kucoin_exc-rest - get_ticker- {:?}", res_data); let res_data = kucoin_exc.get_orders("".to_string(), "".to_string()).await; trace!("kucoin_exc-rest - get_orders- {:?}", res_data); // let res_data = kucoin_exc.get_positions("USDT".to_string()).await; // trace!("kucoin_exc-rest - get_positions- {:?}", res_data); // let res_data = kucoin_exc.get_orders_details("111".to_string(), "".to_string()).await; // trace!("kucoin_exc-rest - get_orders_details- {:?}", res_data); // let res_data = kucoin_exc.swap_bazaar_order( // "cs_202309111808".to_string(), // "ROSEUSDTM".to_string(), // "pd".to_string(), // 1, // "10".to_string(), // "0.03856".to_string(), // "limit".to_string(), // ).await; // trace!("kucoin_exc-rest - swap_bazaar_order- {:?} // let res_data = kucoin_exc.cancel_order("".to_string(), "999999".to_string()).await; // trace!("kucoin_exc-rest - cancel_order- {:?}", res_data); // let res_data = kucoin_exc.get_public_token().await; // trace!("kucoin_exc-rest - get_public_token- {:?}", res_data); } async fn demo_rest_gate() { let mut btree_map: BTreeMap = BTreeMap::new(); btree_map.insert("access_key".to_string(), "".to_string()); btree_map.insert("secret_key".to_string(), "".to_string()); let mut gate_exc = GateSwapRest::new(false, btree_map); let res_data = gate_exc.get_account("usdt".to_string()).await; trace!("gate-rest -账户信息{:?}", res_data); trace!("gate-rest -get_delays{:?}", gate_exc.get_delays() ); trace!("gate-rest -get_avg_delay{:?}", gate_exc.get_avg_delay()); // let res_data = gate_exc.get_position("usdt".to_string(), "CYBER_USDT".to_string()).await; // trace!("gate-rest -持仓信息{:?}", res_data); // let res_data = gate_exc.get_ticker("usdt".to_string()).await; // trace!("gate-rest -ticker{:?}", res_data); // let res_data = gate_exc.get_server_time().await; // trace!("gate-rest -get_server_time{:?}", res_data); // let res_data = gate_exc.get_user_position("usdt".to_string()).await; // trace!("gate-rest -get_server_time{:?}", res_data); // let res_data = gate_exc.get_order_details("usdt".to_string(), "11335522".to_string()).await; // trace!("gate-rest -get_order_details{:?}", res_data); // let res_data = gate_exc.get_orders("usd1t".to_string(), "open".to_string()).await; // trace!("gate-rest -get_orders{:?}", res_data); // let params = serde_json::json!({ // "contract":"CYBER_USDT", // "size":-1, // "price":"0", // "tif":"ioc", // }); // let res_data = gate_exc.take_order("usdt".to_string(), params).await; // trace!("gate-rest -get_orders{:?}", res_data); // let res_data = gate_exc.setting_dual_mode("usdt".to_string(), true).await; // trace!("gate-rest -setting_dual_mode{:?}", res_data); // let res_data = gate_exc.setting_dual_leverage("usdt".to_string(), // "CYBER_USDT".to_string(), // "20".to_string(), // ).await; // trace!("gate-rest -setting_dual_mode{:?}", res_data); // let res_data = gate_exc.wallet_transfers("usdt".to_string(), // "CYBER_USDT".to_string(), // "20".to_string(), // ).await; // trace!("gate-rest -setting_dual_mode{:?}", res_data); // let res_data = gate_exc.cancel_order("usdt".to_string(), // "12345".to_string(), // ).await; // trace!("gate-rest -setting_dual_mode{:?}", res_data); // let res_data = gate_exc.cancel_orders("usdt".to_string(), // "CYBER_USDT".to_string(), // ).await; // trace!("gate-rest -cancel_orders{:?}", res_data); // let res_data = gate_exc.order( // "usdt".to_string(), // "long".to_string(), // "buy".to_string(), // "ROSE_USDT".to_string(), // 1, // "0.03888".to_string(), // "t-my-custom-id-001".to_string(), // ).await; // trace!("gate-rest -order{:?}", res_data); // let res_data = gate_exc.my_trades("usdt".to_string()).await; // trace!("gate-rest -my_trades{:?}", res_data); let res_data = gate_exc.account_book("usdt".to_string()).await; trace!("gate-rest -account_book{:?}", res_data); } async fn demo_rest_ba() { let mut btree_map: BTreeMap = BTreeMap::new(); btree_map.insert("access_key".to_string(), "".to_string()); btree_map.insert("secret_key".to_string(), "".to_string()); // let ba_exc = BinanceUsdtSwapRest::new(false, btree_map); // let res_data = ba_exc.get_account().await; let mut ba_exc = BinanceSwapRest::new(false, btree_map); // let res_data = ba_exc.get_server_time().await; // trace!("币安-rest - get_server_time{:?}", res_data); // let res_data = ba_exc.get_exchange_info().await; // trace!("币安-restget_ get_exchange_info{:?}", res_data); let res_data = ba_exc.get_account().await; trace!("币安-rest-获取账户信息{:?}", res_data); // trace!("币安-rest- -get_delays{:?}", ba_exc.get_delays() ); // trace!("币安-rest--get_avg_delay{:?}", ba_exc.get_avg_delay()); // let res_data = ba_exc.get_order("BTCUSDT".to_string(), 131, "".to_string()).await; // trace!("币安-rest--get_order{:?}", res_data); // let res_data = ba_exc.get_open_orders("BTCUSDT".to_string()).await; // trace!("币安-rest--get_open_orders{:?}", res_data); // let timestamp_start = chrono::Utc::now().timestamp_millis() - 60 * 100 * 1000; // let timestamp_end = chrono::Utc::now().timestamp_millis() + 60 * 100 * 1000; // let res_data = ba_exc.get_all_orders("BTCUSDT".to_string(), 1000, timestamp_start, timestamp_end).await; // trace!("币安-rest--get_all_orders{:?}", res_data); // let res_data = ba_exc.get_book_ticker("BTCUSDT".to_string()).await; // trace!("币安-rest--get_book_ticker{:?}", res_data); // let res_data = ba_exc.get_position_risk("BTCUS1DT".to_string()).await; // trace!("币安-rest--get_position_risk{:?}", res_data); // let res_data = ba_exc.change_pos_side(true).await; // trace!("币安-rest--change_pos_side{:?}", res_data); // let res_data = ba_exc.cancel_order("BTCUSDT".to_string(), 3312331, "".to_string()).await; // trace!("币安-rest--cancel_order{:?}", res_data); } async fn demo_pub_ws_ba() { // let mut is_shutdown_arc = Arc::new(AtomicBool::new(true)); // let btree_map: BTreeMap = BTreeMap::new(); // let (tx, mut rx) = channel(1024); // let mut binance_ws = BinanceSwapWs::new(false, btree_map, BinanceWsType::PublicAndPrivate, tx); // binance_ws.set_subscribe(vec![ // BinanceSubscribeType::PuAggTrade, // BinanceSubscribeType::PuDepth20levels100ms, // BinanceSubscribeType::PuBookTicker, // ]); // let t1 = tokio::spawn(async move { // binance_ws.custom_subscribe(is_shutdown_arc,vec!["BTCUSDT".to_string()]).await; // }); // let t2 = tokio::spawn(async move { // loop { // if let Ok(received) = rx.try_recv() { // trace!( "age: {:?}", received); // } // } // }); // try_join!(t1,t2).unwrap(); } fn demo_get_http_proxy() { //代理地址 let parsing_detail = proxy::ParsingDetail::parsing_environment_variables(); trace!("----代理信息:{:?}", parsing_detail); } // /*********************web服务*/ // fn demo_http() { // let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); // for stream in listener.incoming() { // let stream = stream.unwrap(); // // handle_connection(TcpStream::try_from(stream).unwrap()); // } // } // // // fn handle_connection(mut stream: TcpStream) { // let buf_reader = BufReader::new(&mut stream); // let http_request: Vec<_> = buf_reader // .lines() // .map(|result| result.unwrap()) // .take_while(|line| !line.is_empty()) // .collect(); // trace!("Request: {:#?}", http_request); // trace!("Request2: {:#?}", http_request[0]); // trace!("Request3: {:#?}", http_request[1]); // // let (status_line, filename) = if http_request[0] == "GET / HTTP/1.1" { // ("HTTP/1.1 200 OK", "hello.html") // } else { // ("HTTP/1.1 404 NOT FOUND", "404.html") // }; // // let status_line = "HTTP/1.1 200 OK"; // let contents = fs::read_to_string("src/404.html").unwrap(); // let length = contents.len(); // // let response = // format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); // // let response = "HTTP/1.1 200 OK\r\n\r\nccccc"; // // stream.write_all(response.as_bytes()).unwrap(); // }