use std::collections::BTreeMap; use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::AtomicBool; use rust_decimal::Decimal; use serde_json::Value; use tokio::sync::Mutex; use tracing::{error, info, trace}; use exchanges::coinex_swap_rest::CoinexSwapRest; use exchanges::coinex_swap_ws::{CoinexSwapLogin, CoinexSwapSubscribeType, CoinexSwapWs}; use exchanges::response_base::ResponseData; use global::log_utils::{init_log_with_info, init_log_with_trace}; const ACCESS_KEY: &str = ""; const SECRET_KEY: &str = ""; //ws-订阅公共频道信息 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn ws_custom_subscribe() { global::log_utils::init_log_with_trace(); let (write_tx, write_rx) = futures_channel::mpsc::unbounded(); let (_read_tx, mut read_rx) = futures_channel::mpsc::unbounded::(); // let (write_tx, write_rx) = tokio::sync::broadcast::channel::(10); // let (read_tx, mut read_rx) = tokio::sync::broadcast::channel::(10); let write_tx_am = Arc::new(Mutex::new(write_tx)); let is_shutdown_arc = Arc::new(AtomicBool::new(true)); //读取 let _is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc); let _tr = tokio::spawn(async move { trace!("线程-数据读取-开启"); loop { // 从通道中接收并丢弃所有的消息,直到通道为空 while let Ok(Some(_data)) = read_rx.try_next() { // 从通道中接收并丢弃所有的消息,直到通道为空 while let Ok(Some(_)) = read_rx.try_next() { // 消息被忽略 } } } // trace!("线程-数据读取-结束"); }); //写数据 // let bool_v2_clone = Arc::clone(&is_shutdown_arc); // let write_tx_clone = Arc::clone(&write_tx_am); // let su = ws.get_subscription(); // let tw = tokio::spawn(async move { // trace!("线程-数据写入-开始"); // loop { // tokio::time::sleep(Duration::from_millis(20 * 1000)).await; // // let close_frame = CloseFrame { // // code: CloseCode::Normal, // // reason: Cow::Borrowed("Bye bye"), // // }; // // let message = Message::Close(Some(close_frame)); // // // let message = Message::Text(su.clone()); // AbstractWsMode::send_subscribe(write_tx_clone.clone(), message.clone()).await; // trace!("发送指令成功"); // } // trace!("线程-数据写入-结束"); // }); let fun = move |data: ResponseData| { async move { println!("{:?}", data); } }; let param = CoinexSwapLogin { api_key: ACCESS_KEY.to_string(), secret: SECRET_KEY.to_string(), }; let t1 = tokio::spawn(async move { let mut ws = get_ws(Option::from(param)); ws.set_symbols(vec!["FTM_USDT".to_string()]); ws.set_subscribe(vec![ // CoinexSwapSubscribeType::PuFuturesDepth, CoinexSwapSubscribeType::PrFuturesOrders ]); //链接 let bool_v3_clone = Arc::clone(&is_shutdown_arc); ws.ws_connect_async(bool_v3_clone, fun, &write_tx_am, write_rx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)"); trace!("test 唯一线程结束--"); }); tokio::try_join!(t1).unwrap(); trace!("当此结束"); trace!("重启!"); trace!("参考交易所关闭"); return; } fn get_ws(btree_map: Option) -> CoinexSwapWs { let coinex_ws = CoinexSwapWs::new(btree_map); coinex_ws } #[tokio::test] async fn rest_account_book_test() { let mut ret = get_rest(); let req_data = ret.get_account().await; let res_data_json: Value = req_data.data; println!("coinex--查询合约账户--{:?}", res_data_json); let result = res_data_json.as_array().unwrap().get(0).unwrap(); println!("coinex--查询合约账户--{:?}", result["ccy"]); } #[tokio::test] async fn rest_spot_account_book_test() { let mut ret = get_rest(); let req_data = ret.get_spot_account().await; let res_data_json: Value = req_data.data; println!("coinex--查询现货账户--{:?}", res_data_json); let result = res_data_json.as_array().unwrap().get(0).unwrap(); println!("coinex--查询现货账户--{:?}", result["ccy"]); } #[tokio::test] async fn rest_position_test() { let mut ret = get_rest(); let req_data = ret.get_position("BOMEUSDT".to_string()).await; println!("coinex--查询仓位--{:?}", req_data); } #[tokio::test] async fn rest_positions_test() { let mut ret = get_rest(); let req_data = ret.get_user_position().await; println!("coinex--查询用户所有仓位--{:?}", req_data); } #[tokio::test] async fn rest_ticker_test() { let mut ret = get_rest(); let req_data = ret.get_ticker("BOMEUSDT".to_string()).await; println!("coinex--查询ticker--{:?}", req_data); } #[tokio::test] async fn rest_market_test() { let mut ret = get_rest(); let req_data = ret.get_market_details("BOMEUSDT".to_string()).await; println!("coinex--查询market--{:?}", req_data); } #[tokio::test] async fn rest_market_list_test() { let symbol = "DOGEUSDT".to_string(); let price = Decimal::from_str("0.15").unwrap(); let custom_id = "436265461"; let order_side; let position_side; let amount = Decimal::from_str("1769").unwrap(); let ct_val = Decimal::from_str("1").unwrap(); let origin_side = "kk"; let size = (amount / ct_val).floor(); match origin_side { "kd" => { position_side = "long"; order_side = "buy"; } "pd" => { position_side = "long"; order_side = "sell"; } "kk" => { position_side = "short"; order_side = "sell"; } "pk" => { position_side = "short"; order_side = "buy"; } _ => { error!("下单参数错误"); position_side = "error"; order_side = "error"; } }; let mut ret = get_rest(); let req_data = ret.order(symbol, position_side.to_string(), order_side.to_string(), size, price, custom_id.to_string()).await; println!("coinex--查询swap_order--{:?}", req_data); } #[tokio::test] async fn rest_cancel_order_test() { let mut ret = get_rest(); let req_data = ret.cancel_order("DOGEUSDT".to_string(), "", "436265461").await; println!("coinex--查询cancel_order--{} {:?}", req_data.data.is_null(), req_data); } #[tokio::test] async fn rest_cancel_all_order_test() { let mut ret = get_rest(); let _ct_val = Decimal::ONE; let orders_res_data = ret.get_pending_orders().await; let mut result = vec![]; if orders_res_data.code == 200 { let orders_res_data_json = orders_res_data.data.as_array().unwrap(); for order in orders_res_data_json { let cancel_res_data = ret.cancel_order_all(order["market"].as_str().unwrap().to_string()).await; if cancel_res_data.code == 200 { result.push(order.clone()) } } } println!("coinex--查询cancel_all_order--{:?}", result); } #[tokio::test] async fn rest_finish_order_list_test() { let mut ret = get_rest(); let req_data = ret.get_finished_orders().await; println!("coinex--查询finish_order--{:?}", req_data); } #[tokio::test] async fn rest_pending_order_list_test() { let mut ret = get_rest(); let req_data = ret.get_pending_orders().await; println!("coinex--查询pending_order--{:?}", req_data); } #[tokio::test] async fn rest_order_status_test() { let mut ret = get_rest(); let req_data = ret.get_order_details("136925916412".to_string(), "DOGEUSDT".to_string()).await; println!("coinex--查询pending_order--{:?}", req_data); } #[tokio::test] async fn rest_setting_dual_leverage_test() { let mut ret = get_rest(); let req_data = ret.setting_dual_leverage("FTMUSDT".to_string(), 10).await; println!("coinex--查询setting_dual_leverage--{:?}", req_data); } #[tokio::test] async fn rest_time() { let mut ret = get_rest(); let res_data = ret.get_server_time().await; let res_data_json: Value = res_data.data; let result = res_data_json["timestamp"].to_string(); println!("coinex--time--{:?}", result); } #[tokio::test] async fn rest_account_get_test() { let mut ret = get_rest(); let res_data = ret.account_get().await; let res_data_json: Value = res_data.data; if res_data_json.is_array() { let array = res_data_json.as_array().unwrap(); for z in array { trace!("coinex--查看子账号列表--{:?}", z); } } } #[tokio::test] async fn rest_account_subs_api_test() { let mut ret = get_rest(); let params = serde_json::json!({ "sub_user_name":"2024050", "ip_whitelist":[], "trade_enabled":false, "remark":"", }); let res_data = ret.account_subs_api(params).await; // let res_data_json: Value = res_data.data; // let result = res_data_json["timestamp"].to_string(); println!("coinex--获取子账号的apikey--{:?}", res_data); } // 将主账号下的 子账号 都创建apikey #[tokio::test] async fn rest_account_get_and_api_test() { let mut ret = get_rest(); let res_data = ret.account_get().await; let res_data_json: Value = res_data.data; if res_data_json.is_array() { let array = res_data_json.as_array().unwrap(); for z in array { let sub_user_name = z["sub_user_name"].as_str().unwrap(); info!("coinex--查看子账号列表--{:?}", z); // //每个账号创建apiKey // let params2 = serde_json::json!({ // "sub_user_name":sub_user_name, // "ip_whitelist":[], // "trade_enabled":false, // "remark":"", // }); // let res_data2 = ret.account_subs_api(params2).await; // // info!("coinex--创建成功响应--{:?}", res_data2); // let res_data_json2: Value = res_data2.data; // let api_id = res_data_json2["api_id"].as_i64().unwrap(); // let access_id = res_data_json2["access_id"].as_str().unwrap(); // let secret_key = res_data_json2["secret_key"].as_str().unwrap(); // info!("sub_user_name:{:?} \tapi_id:{:?} \t access_id:{:?}\t secret_key:{:?}",sub_user_name,api_id,access_id,secret_key); } } } // 获取子账号 APIKEY 列表 #[tokio::test] async fn rest_account_get_apikey_test() { let str = "api_id access_id secret_key"; let mut ret = get_rest(); let res_data = ret.account_get().await; let res_data_json: Value = res_data.data; if res_data_json.is_array() { let array = res_data_json.as_array().unwrap(); for z in array { let sub_user_name = z["sub_user_name"].as_str().unwrap(); info!("coinex--查看子账号列表--{:?}", sub_user_name); //每个账号创建apiKey let params = serde_json::json!({ "sub_user_name":sub_user_name, }); let res_data2 = ret.account_get_apikey(params).await; // trace!("coinex--获取子账号的apikey--{:?}", res_data2); let res_data_json2: Value = res_data2.data; if res_data_json2.is_array() { let array2 = res_data_json2.as_array().unwrap(); for z2 in array2 { let api_id = z2["api_id"].as_i64().unwrap(); let access_id = z2["access_id"].as_str().unwrap(); info!("sub_user_name:{:?} \tapi_id:{:?} \t access_id:{:?}\t ",sub_user_name,api_id,access_id); //查询详情 let params3 = serde_json::json!({ "api_id":api_id }); let res_data3 = ret.account_get_detail(params3).await; info!("coinex--详情--{:?}", res_data3.data); //编辑子账号 APIKEY // let params4 = serde_json::json!({ // "sub_user_name":sub_user_name, // "api_id":api_id, // "trade_enabled":true // }); // let res_data4 = ret.account_get_update(params4).await; // info!("coinex--编辑结果--{:?}", res_data4.data); //删除 // let params2 = serde_json::json!({ // "api_id":api_id // }); // let res_data23 = ret.account_del_apikey(params2).await; // info!("coinex--删除结果--{:?}", res_data23); } } } } // info!("sub_user_name:{:?} \tapi_id:{:?} \t access_id:{:?}\t ",sub_user_name,api_id,access_id) } fn get_rest() -> CoinexSwapRest { // init_log_with_trace(); init_log_with_info(); let mut btree_map: BTreeMap = BTreeMap::new(); btree_map.insert("access_key".to_string(), ACCESS_KEY.to_string()); btree_map.insert("secret_key".to_string(), SECRET_KEY.to_string()); let ba_exc = CoinexSwapRest::new(btree_map); ba_exc }