| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- // use std::collections::BTreeMap;
- // use std::sync::Arc;
- // use std::sync::atomic::AtomicBool;
- // use futures_util::StreamExt;
- // use rust_decimal::Decimal;
- // use tokio::sync::Mutex;
- // use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
- // use exchanges::response_base::ResponseData;
- // use global::trace_stack::TraceStack;
- // use standard::exchange::ExchangeEnum::KucoinSpot;
- // use crate::exchange_disguise::on_special_depth;
- // use crate::model::OriginalTradeGa;
- // use crate::core::Core;
- //
- // // 1交易、0参考 kucoin 现货 启动
- // pub async fn kucoin_spot_run(is_shutdown_arc: Arc<AtomicBool>,
- // _is_trade: bool,
- // core_arc: Arc<Mutex<Core>>,
- // name: String,
- // symbols: Vec<String>,
- // is_colo: bool,
- // _exchange_params: BTreeMap<String, String>) {
- // let mut symbol_arr = Vec::new();
- // for symbol in symbols {
- // let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot, symbol.as_str());
- // let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
- // symbol_arr.push(new_symbol);
- // }
- //
- // let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
- // let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
- //
- // let mut ws = KucoinSpotWs::new_label(name.clone(), is_colo, None, KucoinSpotWsType::Public).await;
- // ws.set_symbols(symbol_arr);
- // ws.set_subscribe(vec![
- // KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50,
- // // KucoinSpotSubscribeType::PuMarketTicker, // python说:订阅 ticker来的很慢
- // KucoinSpotSubscribeType::PuMarketMatch,
- // ]);
- //
- // // 开启ws
- // let write_tx_am = Arc::new(Mutex::new(write_tx));
- // tokio::spawn(async move {
- // //链接
- // let bool_v3_clone = Arc::clone(&is_shutdown_arc);
- // ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
- // });
- // //读取
- // // let is_shutdown_arc_clone = Arc::clone(&is_shutdown_arc);
- // tokio::spawn(async move {
- // let core_arc_clone = Arc::clone(&core_arc);
- // // trade
- // let mut update_flag_u = Decimal::ZERO;
- // let mut max_buy = Decimal::ZERO;
- // let mut min_sell = Decimal::ZERO;
- // let multiplier = Decimal::ONE;
- // // let multiplier = core_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
- // // let run_symbol = symbols.clone()[0].clone();
- //
- // loop {
- // if let Some(data) = read_rx.next().await {
- // on_kucoin_spot_data(core_arc_clone.clone(),
- // &mut update_flag_u,
- // multiplier,
- // &mut max_buy,
- // &mut min_sell,
- // data).await;
- // }
- // }
- // });
- // }
- //
- // async fn on_kucoin_spot_data(core_arc_clone: Arc<Mutex<Core>>,
- // update_flag_u: &mut Decimal,
- // _multiplier: Decimal,
- // max_buy: &mut Decimal,
- // min_sell: &mut Decimal,
- // data: ResponseData) {
- // let mut trace_stack = TraceStack::new(0, Instant::now());
- // trace_stack.on_after_network(data.time);
- // trace_stack.on_before_unlock_core();
- //
- // if data.code != "200".to_string() {
- // return;
- // }
- //
- // if data.channel == "level2" {
- // trace_stack.on_before_format();
- // let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot, data.clone());
- // trace_stack.on_before_network(special_depth.create_at.clone());
- // trace_stack.on_after_format();
- //
- // on_special_depth(core_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
- // } else if data.channel == "trade.ticker" {
- // trace_stack.on_before_format();
- // let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data.clone());
- // trace_stack.on_before_network(special_depth.create_at.clone());
- // trace_stack.on_after_format();
- //
- // on_special_depth(core_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
- // } else if data.channel == "trade.l3match" {
- // let mut core = core_arc_clone.lock().await;
- // let str = data.label.clone();
- // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
- // *max_buy = Decimal::ZERO;
- // *min_sell = Decimal::ZERO;
- // core.is_update.remove(str.as_str());
- // }
- // let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
- // if trade.price > *max_buy || *max_buy == Decimal::ZERO {
- // *max_buy = trade.price
- // }
- // if trade.price < *min_sell || *min_sell == Decimal::ZERO {
- // *min_sell = trade.price
- // }
- // core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
- // } else if data.channel == "availableBalance.change" {
- // // 取消原有推送解析,因为推送的信息不准确
- // // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
- // // {
- // // let mut core = core_arc_clone.lock().await;
- // // core.update_equity(account);
- // // }
- // } else if data.channel == "symbolOrderChange" {
- // // trace_stack.on_before_format();
- // // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
- // // trace_stack.on_after_format();
- // // let mut order_infos:Vec<OrderInfo> = Vec::new();
- // // for order in orders.order {
- // // if order.status == "NULL" {
- // // continue;
- // // }
- // // let order_info = OrderInfo {
- // // symbol: "".to_string(),
- // // amount: order.amount.abs(),
- // // side: "".to_string(),
- // // price: order.price,
- // // client_id: order.custom_id,
- // // filled_price: order.avg_price,
- // // filled: order.deal_amount.abs(),
- // // order_id: order.id,
- // // local_time: 0,
- // // create_time: 0,
- // // status: order.status,
- // // fee: Default::default(),
- // // trace_stack: Default::default(),
- // // };
- // // order_infos.push(order_info);
- // // }
- // //
- // // {
- // // let mut core = core_arc_clone.lock().await;
- // // core.update_order(order_infos, trace_stack);
- // // }
- // } else if data.channel == "position.change" {
- // // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
- // // {
- // // let mut core = core_arc_clone.lock().await;
- // // core.update_position(positions);
- // // }
- // }
- // }
|