use std::collections::BTreeMap; use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::AtomicBool; use rust_decimal::Decimal; use serde_json::Value; use tokio::sync::Mutex; use tracing::{error, trace}; use exchanges::coinex_swap_rest::CoinexSwapRest; use exchanges::coinex_swap_ws::{CoinexSwapLogin, CoinexSwapSubscribeType, CoinexSwapWs}; use exchanges::response_base::ResponseData; const ACCESS_KEY: &str = ""; const SECRET_KEY: &str = ""; //ws-订阅公共频道信息 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn ws_custom_subscribe() { global::log_utils::init_log_with_trace(); let (write_tx, write_rx) = futures_channel::mpsc::unbounded(); let (_read_tx, mut read_rx) = futures_channel::mpsc::unbounded::(); // let (write_tx, write_rx) = tokio::sync::broadcast::channel::(10); // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::(10); let write_tx_am = Arc::new(Mutex::new(write_tx)); let is_shutdown_arc = Arc::new(AtomicBool::new(true)); //读取 let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc); let _tr = tokio::spawn(async move { trace!("线程-数据读取-开启"); loop { // 从通道中接收并丢弃所有的消息,直到通道为空 while let Ok(Some(_data)) = read_rx.try_next() { // 从通道中接收并丢弃所有的消息,直到通道为空 while let Ok(Some(_)) = read_rx.try_next() { // 消息被忽略 } } } // trace!("线程-数据读取-结束"); }); //写数据 // let bool_v2_clone = Arc::clone(&is_shutdown_arc); // let write_tx_clone = Arc::clone(&write_tx_am); // let su = ws.get_subscription(); // let tw = tokio::spawn(async move { // trace!("线程-数据写入-开始"); // loop { // tokio::time::sleep(Duration::from_millis(20 * 1000)).await; // // let close_frame = CloseFrame { // // code: CloseCode::Normal, // // reason: Cow::Borrowed("Bye bye"), // // }; // // let message = Message::Close(Some(close_frame)); // // // let message = Message::Text(su.clone()); // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await; // trace!("发送指令成功"); // } // trace!("线程-数据写入-结束"); // }); let fun = move |data: ResponseData| { async move { println!("{:?}", data); } }; let param = CoinexSwapLogin { api_key: ACCESS_KEY.to_string(), secret: SECRET_KEY.to_string(), }; let t1 = tokio::spawn(async move { let mut ws = get_ws(Option::from(param)); ws.set_symbols(vec!["FTM_USDT".to_string()]); ws.set_subscribe(vec![ // CoinexSwapSubscribeType::PuFuturesDepth, CoinexSwapSubscribeType::PrFuturesOrders ]); //链接 let bool_v3_clone = Arc::clone(&is_shutdown_arc); ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)"); trace!("test 唯一线程结束--"); }); tokio::try_join!(t1).unwrap(); trace!("当此结束"); trace!("重启!"); trace!("参考交易所关闭"); return; } fn get_ws(btree_map: Option) -> CoinexSwapWs { let coinex_ws = CoinexSwapWs::new(btree_map); coinex_ws } #[tokio::test] async fn rest_account_book_test() { let mut ret = get_rest(); let req_data = ret.get_account().await; let res_data_json: Value = req_data.data; println!("coinex--查询合约账户--{:?}", res_data_json); let result = res_data_json.as_array().unwrap().get(0).unwrap(); println!("coinex--查询合约账户--{:?}", result["ccy"]); } #[tokio::test] async fn rest_spot_account_book_test() { let mut ret = get_rest(); let req_data = ret.get_spot_account().await; let res_data_json: Value = req_data.data; println!("coinex--查询现货账户--{:?}", res_data_json); let result = res_data_json.as_array().unwrap().get(0).unwrap(); println!("coinex--查询现货账户--{:?}", result["ccy"]); } #[tokio::test] async fn rest_position_test() { let mut ret = get_rest(); let req_data = ret.get_position("BOMEUSDT".to_string()).await; println!("coinex--查询仓位--{:?}", req_data); } #[tokio::test] async fn rest_positions_test() { let mut ret = get_rest(); let req_data = ret.get_user_position().await; println!("coinex--查询用户所有仓位--{:?}", req_data); } #[tokio::test] async fn rest_ticker_test() { let mut ret = get_rest(); let req_data = ret.get_ticker("BOMEUSDT".to_string()).await; println!("coinex--查询ticker--{:?}", req_data); } #[tokio::test] async fn rest_market_test() { let mut ret = get_rest(); let req_data = ret.get_market_details("BOMEUSDT".to_string()).await; println!("coinex--查询market--{:?}", req_data); } #[tokio::test] async fn rest_market_list_test() { let symbol = "DOGEUSDT".to_string(); let price = Decimal::from_str("0.15").unwrap(); let custom_id = "436265461"; let order_side; let position_side; let amount = Decimal::from_str("1769").unwrap(); let ct_val = Decimal::from_str("1").unwrap(); let origin_side = "kk"; let size = (amount / ct_val).floor(); match origin_side { "kd" => { position_side = "long"; order_side = "buy"; } "pd" => { position_side = "long"; order_side = "sell"; } "kk" => { position_side = "short"; order_side = "sell"; } "pk" => { position_side = "short"; order_side = "buy"; } _ => { error!("下单参数错误"); position_side = "error"; order_side = "error"; } }; let mut ret = get_rest(); let req_data = ret.order(symbol, position_side.to_string(), order_side.to_string(), size, price, custom_id.to_string()).await; println!("coinex--查询swap_order--{:?}", req_data); } #[tokio::test] async fn rest_cancel_order_test() { let mut ret = get_rest(); let req_data = ret.cancel_order("DOGEUSDT".to_string(), "", "436265461").await; println!("coinex--查询cancel_order--{} {:?}",req_data.data.is_null(), req_data); } #[tokio::test] async fn rest_cancel_all_order_test() { let mut ret = get_rest(); let _ct_val = Decimal::ONE; let orders_res_data = ret.get_pending_orders().await; let mut result = vec![]; if orders_res_data.code == 200 { let orders_res_data_json = orders_res_data.data.as_array().unwrap(); for order in orders_res_data_json { let cancel_res_data = ret.cancel_order_all( order["market"].as_str().unwrap().to_string()).await; if cancel_res_data.code == 200 { result.push(order.clone()) } } } println!("coinex--查询cancel_all_order--{:?}", result); } #[tokio::test] async fn rest_finish_order_list_test() { let mut ret = get_rest(); let req_data = ret.get_finished_orders().await; println!("coinex--查询finish_order--{:?}", req_data); } #[tokio::test] async fn rest_pending_order_list_test() { let mut ret = get_rest(); let req_data = ret.get_pending_orders().await; println!("coinex--查询pending_order--{:?}", req_data); } #[tokio::test] async fn rest_order_status_test() { let mut ret = get_rest(); let req_data = ret.get_order_details("136925916412".to_string(), "DOGEUSDT".to_string()).await; println!("coinex--查询pending_order--{:?}", req_data); } #[tokio::test] async fn rest_setting_dual_leverage_test() { let mut ret = get_rest(); let req_data = ret.setting_dual_leverage("FTMUSDT".to_string(), 10).await; println!("coinex--查询setting_dual_leverage--{:?}", req_data); } #[tokio::test] async fn rest_time() { let mut ret = get_rest(); let res_data = ret.get_server_time().await; let res_data_json: Value = res_data.data; let result = res_data_json["timestamp"].to_string(); println!("coinex--time--{:?}", result); } fn get_rest() -> CoinexSwapRest { let mut btree_map: BTreeMap = BTreeMap::new(); btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string()); btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string()); let ba_exc = CoinexSwapRest::new(btree_map); ba_exc }