Selaa lähdekoodia

先只保留币安参考和gate交易的,先试试水。

skyfffire 1 vuosi sitten
vanhempi
commit
322c58b12f

+ 137 - 137
strategy/src/binance_spot.rs

@@ -1,137 +1,137 @@
-use std::collections::BTreeMap;
-use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
-use futures_util::StreamExt;
-use rust_decimal::Decimal;
-use tokio::spawn;
-use tokio::sync::Mutex;
-use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
-use exchanges::response_base::ResponseData;
-use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::BinanceSpot;
-use crate::exchange_disguise::on_special_depth;
-use crate::core::Core;
-
-// 参考 币安 现货 启动
-pub async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>,
-                                        core_arc: Arc<Mutex<Core>>,
-                                        name: String,
-                                        symbols: Vec<String>,
-                                        is_colo: bool,
-                                        _exchange_params: BTreeMap<String, String>) {
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-
-    let mut ws = BinanceSpotWs::new_label(name.clone(), is_colo, None, BinanceSpotWsType::PublicAndPrivate);
-    ws.set_symbols(symbols.clone());
-    ws.set_subscribe(vec![
-        BinanceSpotSubscribeType::PuBookTicker,
-        // BinanceSpotSubscribeType::PuDepth20levels100ms
-    ]);
-
-    // 开启数据读取线程
-    let write_tx_am = Arc::new(Mutex::new(write_tx));
-    let bot_arc_clone = core_arc.clone();
-
-    spawn(async move {
-        //链接
-        let bool_v3_clone = Arc::clone(&bool_v1);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    });
-
-    spawn(async move {
-        // trade
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        // ticker
-        let mut update_flag_u = Decimal::ZERO;
-
-        loop {
-            if let Some(data) = read_rx.next().await {
-                on_data(bot_arc_clone.clone(),
-                        &mut update_flag_u,
-                        &mut max_buy,
-                        &mut min_sell,
-                        data).await;
-            }
-        }
-    });
-
-    // let (tx, mut rx) = channel(100);
-    // spawn(async move {
-    //     let mut ba_exc = BinanceSpotWs::new_label(name, false, None, BinanceSpotWsType::PublicAndPrivate);
-    //     ba_exc.set_subscribe(vec![
-    //         // BinanceSpotSubscribeType::PuAggTrade,
-    //         // BinanceSpotSubscribeType::PuBookTicker,
-    //         BinanceSpotSubscribeType::PuDepth20levels100ms
-    //     ]);
-    //     ba_exc.custom_subscribe(bool_v1, symbols.clone()).await;
-    // });
-    //
-    // spawn(async move {
-    //     let bot_arc_clone = Arc::clone(&core_arc);
-    //     // trade
-    //     let mut max_buy = Decimal::ZERO;
-    //     let mut min_sell = Decimal::ZERO;
-    //     // ticker
-    //     let mut update_flag_u = 0i64;
-    //     loop {
-    //         // sleep(Duration::from_micros(10)).await;
-    //
-    //         match rx.try_recv() {
-    //             Ok(data) => {
-    //                 on_data(bot_arc_clone.clone(), &mut update_flag_u, &mut max_buy, &mut min_sell, data).await;
-    //             },
-    //             Err(_e) => { }
-    //         }
-    //
-    //     }
-    // });
-}
-
-async fn on_data(bot_arc_clone: Arc<Mutex<Core>>,
-                 update_flag_u: &mut Decimal,
-                 _max_buy: &mut Decimal,
-                 _min_sell: &mut Decimal,
-                 data: ResponseData) {
-    let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-    if data.channel == "aggTrade" {
-        // let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
-        // let str = data.label.clone();
-        // let mut core = bot_arc_clone.lock().await;
-        // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
-        //     *max_buy = Decimal::ZERO;
-        //     *min_sell = Decimal::ZERO;
-        //     core.is_update.remove(str.as_str());
-        // }
-        // if trade.p > *max_buy || *max_buy == Decimal::ZERO{
-        //     *max_buy = trade.p
-        // }
-        // if trade.p < *min_sell || *min_sell == Decimal::ZERO{
-        //     *min_sell = trade.p
-        // }
-        // {
-        //     core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-        // }
-    } else if data.channel == "bookTicker" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(BinanceSpot, data.clone());
-        trace_stack.on_after_format();
-        trace_stack.on_before_network(special_depth.create_at);
-
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
-    } else if data.channel == "depth" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data.clone());
-        trace_stack.on_after_format();
-        trace_stack.on_before_network(special_depth.create_at);
-
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
-    }
-}
+// use std::collections::BTreeMap;
+// use std::sync::Arc;
+// use std::sync::atomic::AtomicBool;
+// use futures_util::StreamExt;
+// use rust_decimal::Decimal;
+// use tokio::spawn;
+// use tokio::sync::Mutex;
+// use exchanges::binance_spot_ws::{BinanceSpotSubscribeType, BinanceSpotWs, BinanceSpotWsType};
+// use exchanges::response_base::ResponseData;
+// use global::trace_stack::TraceStack;
+// use standard::exchange::ExchangeEnum::BinanceSpot;
+// use crate::exchange_disguise::on_special_depth;
+// use crate::core::Core;
+//
+// // 参考 币安 现货 启动
+// pub async fn reference_binance_spot_run(bool_v1 :Arc<AtomicBool>,
+//                                         core_arc: Arc<Mutex<Core>>,
+//                                         name: String,
+//                                         symbols: Vec<String>,
+//                                         is_colo: bool,
+//                                         _exchange_params: BTreeMap<String, String>) {
+//     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+//
+//     let mut ws = BinanceSpotWs::new_label(name.clone(), is_colo, None, BinanceSpotWsType::PublicAndPrivate);
+//     ws.set_symbols(symbols.clone());
+//     ws.set_subscribe(vec![
+//         BinanceSpotSubscribeType::PuBookTicker,
+//         // BinanceSpotSubscribeType::PuDepth20levels100ms
+//     ]);
+//
+//     // 开启数据读取线程
+//     let write_tx_am = Arc::new(Mutex::new(write_tx));
+//     let bot_arc_clone = core_arc.clone();
+//
+//     spawn(async move {
+//         //链接
+//         let bool_v3_clone = Arc::clone(&bool_v1);
+//         ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//     });
+//
+//     spawn(async move {
+//         // trade
+//         let mut max_buy = Decimal::ZERO;
+//         let mut min_sell = Decimal::ZERO;
+//         // ticker
+//         let mut update_flag_u = Decimal::ZERO;
+//
+//         loop {
+//             if let Some(data) = read_rx.next().await {
+//                 on_data(bot_arc_clone.clone(),
+//                         &mut update_flag_u,
+//                         &mut max_buy,
+//                         &mut min_sell,
+//                         data).await;
+//             }
+//         }
+//     });
+//
+//     // let (tx, mut rx) = channel(100);
+//     // spawn(async move {
+//     //     let mut ba_exc = BinanceSpotWs::new_label(name, false, None, BinanceSpotWsType::PublicAndPrivate);
+//     //     ba_exc.set_subscribe(vec![
+//     //         // BinanceSpotSubscribeType::PuAggTrade,
+//     //         // BinanceSpotSubscribeType::PuBookTicker,
+//     //         BinanceSpotSubscribeType::PuDepth20levels100ms
+//     //     ]);
+//     //     ba_exc.custom_subscribe(bool_v1, symbols.clone()).await;
+//     // });
+//     //
+//     // spawn(async move {
+//     //     let bot_arc_clone = Arc::clone(&core_arc);
+//     //     // trade
+//     //     let mut max_buy = Decimal::ZERO;
+//     //     let mut min_sell = Decimal::ZERO;
+//     //     // ticker
+//     //     let mut update_flag_u = 0i64;
+//     //     loop {
+//     //         // sleep(Duration::from_micros(10)).await;
+//     //
+//     //         match rx.try_recv() {
+//     //             Ok(data) => {
+//     //                 on_data(bot_arc_clone.clone(), &mut update_flag_u, &mut max_buy, &mut min_sell, data).await;
+//     //             },
+//     //             Err(_e) => { }
+//     //         }
+//     //
+//     //     }
+//     // });
+// }
+//
+// async fn on_data(bot_arc_clone: Arc<Mutex<Core>>,
+//                  update_flag_u: &mut Decimal,
+//                  _max_buy: &mut Decimal,
+//                  _min_sell: &mut Decimal,
+//                  data: ResponseData) {
+//     let mut trace_stack = TraceStack::default();
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//     if data.channel == "aggTrade" {
+//         // let trade: OriginalTradeBa = serde_json::from_str(data.data.as_str()).unwrap();
+//         // let str = data.label.clone();
+//         // let mut core = bot_arc_clone.lock().await;
+//         // if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
+//         //     *max_buy = Decimal::ZERO;
+//         //     *min_sell = Decimal::ZERO;
+//         //     core.is_update.remove(str.as_str());
+//         // }
+//         // if trade.p > *max_buy || *max_buy == Decimal::ZERO{
+//         //     *max_buy = trade.p
+//         // }
+//         // if trade.p < *min_sell || *min_sell == Decimal::ZERO{
+//         //     *min_sell = trade.p
+//         // }
+//         // {
+//         //     core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+//         // }
+//     } else if data.channel == "bookTicker" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(BinanceSpot, data.clone());
+//         trace_stack.on_after_format();
+//         trace_stack.on_before_network(special_depth.create_at);
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+//     } else if data.channel == "depth" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BinanceSpot, data.clone());
+//         trace_stack.on_after_format();
+//         trace_stack.on_before_network(special_depth.create_at);
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+//     }
+// }

+ 222 - 222
strategy/src/bitget_spot.rs

@@ -1,222 +1,222 @@
-use std::collections::BTreeMap;
-use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use futures_util::StreamExt;
-use rust_decimal::Decimal;
-use tokio::spawn;
-use tokio::sync::Mutex;
-use tokio::time::sleep;
-use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
-use exchanges::response_base::ResponseData;
-use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::BitgetSpot;
-use crate::exchange_disguise::on_special_depth;
-use crate::model::{OrderInfo, OriginalTradeGa};
-use crate::core::Core;
-
-pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>,
-                             is_trade: bool,
-                             core_arc: Arc<Mutex<Core>>,
-                             name: String,
-                             symbols: Vec<String>,
-                             is_colo: bool,
-                             exchange_params: BTreeMap<String, String>) {
-    // 开启公共频道
-    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
-    let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
-    let mut ku_public = BitgetSpotWs::new_label(name.clone(),
-                                                is_colo,
-                                                None,
-                                                BitgetSpotWsType::Public);
-    ku_public.set_symbols(symbols.clone());
-    // 交易交易所只用订阅深度数据
-    if is_trade {
-        ku_public.set_subscribe(vec![
-            BitgetSpotSubscribeType::PuBooks5
-        ]);
-    } else {
-        // 参考交易所还要订阅实时订单流数据
-        ku_public.set_subscribe(vec![
-            BitgetSpotSubscribeType::PuTrade,
-            BitgetSpotSubscribeType::PuBooks5
-        ]);
-    }
-    // 开启公共连接
-    let bool_v1_c1 = bool_v1.clone();
-    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
-    spawn(async move {
-        ku_public.ws_connect_async(bool_v1_c1,
-                                   &write_tx_am_public,
-                                   write_rx_public,
-                                   read_tx_public)
-            .await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    });
-    // 消费数据
-    let bot_arc_clone = core_arc.clone();
-    spawn(async move {
-        // ticker
-        let mut update_flag_u = Decimal::ZERO;
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-
-        loop {
-            if let Some(data) = read_rx_public.next().await {
-                on_public_data(bot_arc_clone.clone(),
-                        &mut update_flag_u,
-                        &mut max_buy,
-                        &mut min_sell,
-                        data).await;
-            }
-        }
-    });
-
-    // 开启私有频道
-    if is_trade {
-        // 新增获取余额的协程
-        let account_core_arc = core_arc.clone();
-        spawn(async move {
-            loop {
-                // 每30秒重新获取一次
-                sleep(Duration::from_secs(30)).await;
-
-                {
-                    let mut core = account_core_arc.lock().await;
-                    core.update_equity_rest_spot().await;
-                }
-            }
-        });
-
-        // 其余也要再开两个通道,不能与公共频道混用
-        let (write_tx_private, write_rx_public) = futures_channel::mpsc::unbounded();
-        let (read_tx_private, mut read_rx_public) = futures_channel::mpsc::unbounded();
-        let bool_v1_c2 = bool_v1.clone();
-        let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
-        spawn(async move {
-            let login_params = parse_btree_map_to_bitget_spot_login(exchange_params);
-            let mut ku_private = BitgetSpotWs::new_label(name.clone(),
-                                                         is_colo,
-                                                         Some(login_params),
-                                                         BitgetSpotWsType::Private);
-            ku_private.set_symbols(symbols.clone());
-            ku_private.set_subscribe(vec![
-                BitgetSpotSubscribeType::PrAccount,
-                BitgetSpotSubscribeType::PrOrders,
-            ]);
-
-            ku_private.ws_connect_async(bool_v1_c2,
-                                        &write_tx_am_private,
-                                        write_rx_public,
-                                        read_tx_private)
-                .await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-
-        // 消费数据
-        let bot_arc_clone = core_arc.clone();
-        spawn(async move {
-            let ct_val = core_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
-
-            loop {
-                if let Some(data) = read_rx_public.next().await {
-                    on_private_data(bot_arc_clone.clone(),
-                                   ct_val,
-                                   data).await;
-                }
-            }
-        });
-    }
-}
-
-async fn on_private_data(bot_arc_clone: Arc<Mutex<Core>>, ct_val: Decimal, data: ResponseData) {
-    let mut trace_stack = TraceStack::default();
-
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-
-    if data.channel == "orders" {
-        trace_stack.on_before_format();
-        let orders = standard::handle_info::HandleSwapInfo::handle_order(BitgetSpot, data.clone(), ct_val);
-        trace_stack.on_after_format();
-        let mut order_infos:Vec<OrderInfo> = Vec::new();
-        for order in orders.order {
-            if order.status == "NULL" {
-                continue;
-            }
-            let order_info = OrderInfo {
-                symbol: "".to_string(),
-                amount: order.amount.abs(),
-                side: "".to_string(),
-                price: order.price,
-                client_id: order.custom_id,
-                filled_price: order.avg_price,
-                filled: order.deal_amount.abs(),
-                order_id: order.id,
-                local_time: 0,
-                create_time: 0,
-                status: order.status,
-                fee: Default::default(),
-                trace_stack: Default::default(),
-            };
-            order_infos.push(order_info);
-        }
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_order(order_infos, trace_stack);
-        }
-    } else if data.channel == "account" {
-        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(BitgetSpot, data, run_symbol.clone());
-        // {
-        //     let mut core = bot_arc_clone.lock().await;
-        //     core.update_equity(account);
-        // }
-    }
-}
-
-async fn on_public_data(bot_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
-    let mut trace_stack = TraceStack::default();
-
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-
-    if data.channel == "books5" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSpot, data.clone());
-        trace_stack.on_after_format();
-
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
-    } else if data.channel == "trade" {
-        let mut core = bot_arc_clone.lock().await;
-        let str = data.label.clone();
-        if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
-            *max_buy = Decimal::ZERO;
-            *min_sell = Decimal::ZERO;
-            core.is_update.remove(str.as_str());
-        }
-        let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
-        for trade in trades {
-            if trade.price > *max_buy || *max_buy == Decimal::ZERO {
-                *max_buy = trade.price
-            }
-            if trade.price < *min_sell || *min_sell == Decimal::ZERO {
-                *min_sell = trade.price
-            }
-        }
-        core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-    }
-}
-
-fn parse_btree_map_to_bitget_spot_login(exchange_params: BTreeMap<String, String>) -> BitgetSpotLogin {
-    BitgetSpotLogin {
-        api_key: exchange_params.get("access_key").unwrap().clone(),
-        secret_key: exchange_params.get("secret_key").unwrap().clone(),
-        passphrase_key: exchange_params.get("pass_key").unwrap().clone(),
-    }
-}
+// use std::collections::BTreeMap;
+// use std::sync::Arc;
+// use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
+// use futures_util::StreamExt;
+// use rust_decimal::Decimal;
+// use tokio::spawn;
+// use tokio::sync::Mutex;
+// use tokio::time::sleep;
+// use exchanges::bitget_spot_ws::{BitgetSpotLogin, BitgetSpotSubscribeType, BitgetSpotWs, BitgetSpotWsType};
+// use exchanges::response_base::ResponseData;
+// use global::trace_stack::TraceStack;
+// use standard::exchange::ExchangeEnum::BitgetSpot;
+// use crate::exchange_disguise::on_special_depth;
+// use crate::model::{OrderInfo, OriginalTradeGa};
+// use crate::core::Core;
+//
+// pub async fn bitget_spot_run(bool_v1 :Arc<AtomicBool>,
+//                              is_trade: bool,
+//                              core_arc: Arc<Mutex<Core>>,
+//                              name: String,
+//                              symbols: Vec<String>,
+//                              is_colo: bool,
+//                              exchange_params: BTreeMap<String, String>) {
+//     // 开启公共频道
+//     let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+//     let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
+//     let mut ku_public = BitgetSpotWs::new_label(name.clone(),
+//                                                 is_colo,
+//                                                 None,
+//                                                 BitgetSpotWsType::Public);
+//     ku_public.set_symbols(symbols.clone());
+//     // 交易交易所只用订阅深度数据
+//     if is_trade {
+//         ku_public.set_subscribe(vec![
+//             BitgetSpotSubscribeType::PuBooks5
+//         ]);
+//     } else {
+//         // 参考交易所还要订阅实时订单流数据
+//         ku_public.set_subscribe(vec![
+//             BitgetSpotSubscribeType::PuTrade,
+//             BitgetSpotSubscribeType::PuBooks5
+//         ]);
+//     }
+//     // 开启公共连接
+//     let bool_v1_c1 = bool_v1.clone();
+//     let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+//     spawn(async move {
+//         ku_public.ws_connect_async(bool_v1_c1,
+//                                    &write_tx_am_public,
+//                                    write_rx_public,
+//                                    read_tx_public)
+//             .await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//     });
+//     // 消费数据
+//     let bot_arc_clone = core_arc.clone();
+//     spawn(async move {
+//         // ticker
+//         let mut update_flag_u = Decimal::ZERO;
+//         let mut max_buy = Decimal::ZERO;
+//         let mut min_sell = Decimal::ZERO;
+//
+//         loop {
+//             if let Some(data) = read_rx_public.next().await {
+//                 on_public_data(bot_arc_clone.clone(),
+//                         &mut update_flag_u,
+//                         &mut max_buy,
+//                         &mut min_sell,
+//                         data).await;
+//             }
+//         }
+//     });
+//
+//     // 开启私有频道
+//     if is_trade {
+//         // 新增获取余额的协程
+//         let account_core_arc = core_arc.clone();
+//         spawn(async move {
+//             loop {
+//                 // 每30秒重新获取一次
+//                 sleep(Duration::from_secs(30)).await;
+//
+//                 {
+//                     let mut core = account_core_arc.lock().await;
+//                     core.update_equity_rest_spot().await;
+//                 }
+//             }
+//         });
+//
+//         // 其余也要再开两个通道,不能与公共频道混用
+//         let (write_tx_private, write_rx_public) = futures_channel::mpsc::unbounded();
+//         let (read_tx_private, mut read_rx_public) = futures_channel::mpsc::unbounded();
+//         let bool_v1_c2 = bool_v1.clone();
+//         let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+//         spawn(async move {
+//             let login_params = parse_btree_map_to_bitget_spot_login(exchange_params);
+//             let mut ku_private = BitgetSpotWs::new_label(name.clone(),
+//                                                          is_colo,
+//                                                          Some(login_params),
+//                                                          BitgetSpotWsType::Private);
+//             ku_private.set_symbols(symbols.clone());
+//             ku_private.set_subscribe(vec![
+//                 BitgetSpotSubscribeType::PrAccount,
+//                 BitgetSpotSubscribeType::PrOrders,
+//             ]);
+//
+//             ku_private.ws_connect_async(bool_v1_c2,
+//                                         &write_tx_am_private,
+//                                         write_rx_public,
+//                                         read_tx_private)
+//                 .await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//         });
+//
+//         // 消费数据
+//         let bot_arc_clone = core_arc.clone();
+//         spawn(async move {
+//             let ct_val = core_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
+//
+//             loop {
+//                 if let Some(data) = read_rx_public.next().await {
+//                     on_private_data(bot_arc_clone.clone(),
+//                                    ct_val,
+//                                    data).await;
+//                 }
+//             }
+//         });
+//     }
+// }
+//
+// async fn on_private_data(bot_arc_clone: Arc<Mutex<Core>>, ct_val: Decimal, data: ResponseData) {
+//     let mut trace_stack = TraceStack::default();
+//
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//
+//     if data.channel == "orders" {
+//         trace_stack.on_before_format();
+//         let orders = standard::handle_info::HandleSwapInfo::handle_order(BitgetSpot, data.clone(), ct_val);
+//         trace_stack.on_after_format();
+//         let mut order_infos:Vec<OrderInfo> = Vec::new();
+//         for order in orders.order {
+//             if order.status == "NULL" {
+//                 continue;
+//             }
+//             let order_info = OrderInfo {
+//                 symbol: "".to_string(),
+//                 amount: order.amount.abs(),
+//                 side: "".to_string(),
+//                 price: order.price,
+//                 client_id: order.custom_id,
+//                 filled_price: order.avg_price,
+//                 filled: order.deal_amount.abs(),
+//                 order_id: order.id,
+//                 local_time: 0,
+//                 create_time: 0,
+//                 status: order.status,
+//                 fee: Default::default(),
+//                 trace_stack: Default::default(),
+//             };
+//             order_infos.push(order_info);
+//         }
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_order(order_infos, trace_stack);
+//         }
+//     } else if data.channel == "account" {
+//         // let account = standard::handle_info::HandleSwapInfo::handle_account_info(BitgetSpot, data, run_symbol.clone());
+//         // {
+//         //     let mut core = bot_arc_clone.lock().await;
+//         //     core.update_equity(account);
+//         // }
+//     }
+// }
+//
+// async fn on_public_data(bot_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+//     let mut trace_stack = TraceStack::default();
+//
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//
+//     if data.channel == "books5" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(BitgetSpot, data.clone());
+//         trace_stack.on_after_format();
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+//     } else if data.channel == "trade" {
+//         let mut core = bot_arc_clone.lock().await;
+//         let str = data.label.clone();
+//         if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
+//             *max_buy = Decimal::ZERO;
+//             *min_sell = Decimal::ZERO;
+//             core.is_update.remove(str.as_str());
+//         }
+//         let trades: Vec<OriginalTradeGa> = serde_json::from_str(data.data.as_str()).unwrap();
+//         for trade in trades {
+//             if trade.price > *max_buy || *max_buy == Decimal::ZERO {
+//                 *max_buy = trade.price
+//             }
+//             if trade.price < *min_sell || *min_sell == Decimal::ZERO {
+//                 *min_sell = trade.price
+//             }
+//         }
+//         core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+//     }
+// }
+//
+// fn parse_btree_map_to_bitget_spot_login(exchange_params: BTreeMap<String, String>) -> BitgetSpotLogin {
+//     BitgetSpotLogin {
+//         api_key: exchange_params.get("access_key").unwrap().clone(),
+//         secret_key: exchange_params.get("secret_key").unwrap().clone(),
+//         passphrase_key: exchange_params.get("pass_key").unwrap().clone(),
+//     }
+// }

+ 286 - 286
strategy/src/bybit_usdt_swap.rs

@@ -1,286 +1,286 @@
-use std::cmp::Ordering;
-use std::collections::BTreeMap;
-use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
-use std::time::Duration;
-use futures_util::StreamExt;
-use rust_decimal::Decimal;
-use tokio::{spawn, time};
-use tokio::sync::Mutex;
-use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
-use exchanges::response_base::ResponseData;
-use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::{BybitSwap};
-use standard::handle_info::{DepthParam, format_depth, make_special_depth};
-use standard::MarketOrder;
-use crate::model::{OrderInfo, OriginalTradeBy};
-use crate::core::Core;
-use crate::exchange_disguise::on_special_depth;
-
-// 1交易、0参考 bybit 合约 启动
-pub async fn bybit_swap_run(bool_v1: Arc<AtomicBool>,
-                           is_trade: bool,
-                           _core_arc: Arc<Mutex<Core>>,
-                           name: String,
-                           symbols: Vec<String>,
-                           is_colo: bool,
-                           exchange_params: BTreeMap<String, String>) {
-    // 启动公共频道
-    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
-    let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
-
-    let mut ws_public = BybitSwapWs::new_label(name.clone(), is_colo, None, BybitSwapWsType::Public);
-    ws_public.set_symbols(symbols.clone());
-    ws_public.set_subscribe(vec![
-        BybitSwapSubscribeType::PuOrderBook50
-    ]);
-    if is_trade {
-        ws_public.set_subscribe(vec![
-            BybitSwapSubscribeType::PuBlicTrade
-        ]);
-    }
-    // 挂起公共ws
-    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
-    let bool_clone_public = Arc::clone(&bool_v1);
-     spawn(async move {
-        ws_public.ws_connect_async(bool_clone_public,
-                                   &write_tx_am_public,
-                                   write_rx_public,
-                                   read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    });
-    // 消费数据
-    let bot_arc_clone = _core_arc.clone();
-    // 接收public数据
-    spawn(async move {
-        // ticker
-        let mut update_flag_u = Decimal::ZERO;
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        let mut depth_asks: Vec<MarketOrder> = Vec::new();
-        let mut depth_bids: Vec<MarketOrder> = Vec::new();
-
-        loop {
-            if let Some(public_data) = read_rx_public.next().await {
-                on_public_data(bot_arc_clone.clone(),
-                               &mut update_flag_u,
-                               &mut max_buy,
-                               &mut min_sell,
-                               public_data,
-                               &mut depth_asks,
-                               &mut depth_bids).await;
-            }
-        }
-    });
-    let trade_symbols = symbols.clone();
-    // 交易交易所需要启动私有ws
-    if is_trade {
-        let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
-        let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
-        let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
-
-        let mut ws_private = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
-        ws_private.set_symbols(trade_symbols);
-        ws_private.set_subscribe(vec![
-            BybitSwapSubscribeType::PrPosition,
-            BybitSwapSubscribeType::PrOrder,
-            BybitSwapSubscribeType::PrWallet
-        ]);
-
-
-        // 挂起私有ws
-        let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
-        let bool_clone_private = Arc::clone(&bool_v1);
-        spawn(async move {
-            ws_private.ws_connect_async(bool_clone_private,
-                                        &write_tx_am_private,
-                                        write_rx_private,
-                                        read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-
-        // 消费数据
-        let bot_arc_clone = _core_arc.clone();
-        // 接收private信息
-        spawn(async move {
-            let ct_val = bot_arc_clone.clone().lock().await.platform_rest.get_self_market().ct_val;
-            let run_symbol = symbols.clone()[0].clone();
-            loop {
-                if let Some(private_data) = read_rx_private.next().await {
-                    on_private_data(bot_arc_clone.clone(),
-                                    ct_val,
-                                    private_data,
-                                    run_symbol.clone()).await;
-                }
-            }
-        });
-
-        // 定时获取仓位信息
-        let position_core_clone = _core_arc.clone();
-        spawn(async move {
-            let mut interval = time::interval(Duration::from_secs(30));
-            loop {
-                interval.tick().await;
-                {
-                    let mut core = position_core_clone.lock().await;
-                    core.update_position_rest_swap().await;
-                }
-
-            }
-        });
-    }
-}
-
-
-
-async fn on_private_data(bot_arc_clone: Arc<Mutex<Core>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
-    let mut trace_stack = TraceStack::default();
-
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-    if data.channel == "wallet" {
-        let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, data, run_symbol.clone());
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_equity(account).await;
-        }
-    } else if data.channel == "order" {
-        trace_stack.on_before_format();
-        let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, data.clone(), ct_val.clone());
-        trace_stack.on_after_format();
-
-        let mut order_infos:Vec<OrderInfo> = Vec::new();
-        for order in orders.order {
-            if order.status == "NULL" {
-                continue;
-            }
-            let order_info = OrderInfo {
-                symbol: "".to_string(),
-                amount: order.amount.abs(),
-                side: "".to_string(),
-                price: order.price,
-                client_id: order.custom_id,
-                filled_price: order.avg_price,
-                filled: order.deal_amount.abs(),
-                order_id: order.id,
-                local_time: 0,
-                create_time: 0,
-                status: order.status,
-                fee: Default::default(),
-                trace_stack: Default::default(),
-            };
-            order_infos.push(order_info);
-        }
-
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_order(order_infos, trace_stack);
-        }
-    } else if data.channel == "position" {
-        let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap,data, ct_val.clone());
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_position(positions).await;
-        }
-    }
-}
-
-async fn on_public_data(bot_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>) {
-    let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-    if data.channel == "orderbook" {
-        let mut is_update = false;
-        let data_type = data.data_type.clone();
-        let label = data.label.clone();
-        if data_type == "delta"  {
-            is_update = true;
-        }
-        trace_stack.on_before_format();
-        let mut depth_format: DepthParam = format_depth(BybitSwap, data);
-        // 是增量更新
-        if is_update {
-            update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
-        } else { // 全量
-            depth_asks.clear();
-            depth_asks.append(&mut depth_format.depth_asks);
-            depth_bids.clear();
-            depth_bids.append(&mut depth_format.depth_bids);
-
-        }
-        let depth = make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
-        trace_stack.on_before_network(depth_format.create_at.clone());
-        trace_stack.on_after_format();
-
-        on_special_depth(bot_arc_clone, update_flag_u, label, trace_stack, depth).await;
-    } else if data.channel == "trade" {
-        let mut core = bot_arc_clone.lock().await;
-        let str = data.label.clone();
-        if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
-            *max_buy = Decimal::ZERO;
-            *min_sell = Decimal::ZERO;
-            core.is_update.remove(str.as_str());
-        }
-        let trades: Vec<OriginalTradeBy> = serde_json::from_str(data.data.as_str()).unwrap();
-        for trade in trades {
-            if trade.p > *max_buy || *max_buy == Decimal::ZERO{
-                *max_buy = trade.p
-            }
-            if trade.p < *min_sell || *min_sell == Decimal::ZERO{
-                *min_sell = trade.p
-            }
-        }
-        core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-    }
-}
-
-fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
-    BybitSwapLogin {
-        api_key: exchange_params.get("access_key").unwrap().clone(),
-        secret_key: exchange_params.get("secret_key").unwrap().clone(),
-    }
-}
-fn update_order_book(depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, asks : Vec<MarketOrder>, bids: Vec<MarketOrder>) {
-    for i in asks {
-        let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
-        match index_of_value {
-            Some(index) => {
-                if i.amount == Decimal::ZERO {
-                    depth_asks.remove(index);
-                } else {
-                    depth_asks[index].amount = i.amount.clone();
-                }
-            },
-            None => {
-                depth_asks.push(i.clone());
-            },
-        }
-    }
-    for i in bids {
-        let index_of_value = depth_bids.iter().position(|x| x.price == i.price);
-        match index_of_value {
-            Some(index) => {
-                if i.amount == Decimal::ZERO {
-                    depth_bids.remove(index);
-                } else {
-                    depth_bids[index].amount = i.amount.clone();
-                }
-            },
-            None => {
-                depth_bids.push(i.clone());
-            },
-        }
-    }
-    depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
-    depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
-
-    // 限制总长度100
-    depth_asks.truncate(100);
-    depth_bids.truncate(100);
-}
+// use std::cmp::Ordering;
+// use std::collections::BTreeMap;
+// use std::sync::Arc;
+// use std::sync::atomic::AtomicBool;
+// use std::time::Duration;
+// use futures_util::StreamExt;
+// use rust_decimal::Decimal;
+// use tokio::{spawn, time};
+// use tokio::sync::Mutex;
+// use exchanges::bybit_swap_ws::{BybitSwapLogin, BybitSwapSubscribeType, BybitSwapWs, BybitSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use global::trace_stack::TraceStack;
+// use standard::exchange::ExchangeEnum::{BybitSwap};
+// use standard::handle_info::{DepthParam, format_depth, make_special_depth};
+// use standard::MarketOrder;
+// use crate::model::{OrderInfo, OriginalTradeBy};
+// use crate::core::Core;
+// use crate::exchange_disguise::on_special_depth;
+//
+// // 1交易、0参考 bybit 合约 启动
+// pub async fn bybit_swap_run(bool_v1: Arc<AtomicBool>,
+//                            is_trade: bool,
+//                            _core_arc: Arc<Mutex<Core>>,
+//                            name: String,
+//                            symbols: Vec<String>,
+//                            is_colo: bool,
+//                            exchange_params: BTreeMap<String, String>) {
+//     // 启动公共频道
+//     let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+//     let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
+//
+//     let mut ws_public = BybitSwapWs::new_label(name.clone(), is_colo, None, BybitSwapWsType::Public);
+//     ws_public.set_symbols(symbols.clone());
+//     ws_public.set_subscribe(vec![
+//         BybitSwapSubscribeType::PuOrderBook50
+//     ]);
+//     if is_trade {
+//         ws_public.set_subscribe(vec![
+//             BybitSwapSubscribeType::PuBlicTrade
+//         ]);
+//     }
+//     // 挂起公共ws
+//     let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+//     let bool_clone_public = Arc::clone(&bool_v1);
+//      spawn(async move {
+//         ws_public.ws_connect_async(bool_clone_public,
+//                                    &write_tx_am_public,
+//                                    write_rx_public,
+//                                    read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//     });
+//     // 消费数据
+//     let bot_arc_clone = _core_arc.clone();
+//     // 接收public数据
+//     spawn(async move {
+//         // ticker
+//         let mut update_flag_u = Decimal::ZERO;
+//         let mut max_buy = Decimal::ZERO;
+//         let mut min_sell = Decimal::ZERO;
+//         let mut depth_asks: Vec<MarketOrder> = Vec::new();
+//         let mut depth_bids: Vec<MarketOrder> = Vec::new();
+//
+//         loop {
+//             if let Some(public_data) = read_rx_public.next().await {
+//                 on_public_data(bot_arc_clone.clone(),
+//                                &mut update_flag_u,
+//                                &mut max_buy,
+//                                &mut min_sell,
+//                                public_data,
+//                                &mut depth_asks,
+//                                &mut depth_bids).await;
+//             }
+//         }
+//     });
+//     let trade_symbols = symbols.clone();
+//     // 交易交易所需要启动私有ws
+//     if is_trade {
+//         let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
+//         let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
+//         let auth = Some(parse_btree_map_to_bybit_swap_login(exchange_params));
+//
+//         let mut ws_private = BybitSwapWs::new_label(name.clone(), is_colo, auth, BybitSwapWsType::Private);
+//         ws_private.set_symbols(trade_symbols);
+//         ws_private.set_subscribe(vec![
+//             BybitSwapSubscribeType::PrPosition,
+//             BybitSwapSubscribeType::PrOrder,
+//             BybitSwapSubscribeType::PrWallet
+//         ]);
+//
+//
+//         // 挂起私有ws
+//         let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+//         let bool_clone_private = Arc::clone(&bool_v1);
+//         spawn(async move {
+//             ws_private.ws_connect_async(bool_clone_private,
+//                                         &write_tx_am_private,
+//                                         write_rx_private,
+//                                         read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//         });
+//
+//         // 消费数据
+//         let bot_arc_clone = _core_arc.clone();
+//         // 接收private信息
+//         spawn(async move {
+//             let ct_val = bot_arc_clone.clone().lock().await.platform_rest.get_self_market().ct_val;
+//             let run_symbol = symbols.clone()[0].clone();
+//             loop {
+//                 if let Some(private_data) = read_rx_private.next().await {
+//                     on_private_data(bot_arc_clone.clone(),
+//                                     ct_val,
+//                                     private_data,
+//                                     run_symbol.clone()).await;
+//                 }
+//             }
+//         });
+//
+//         // 定时获取仓位信息
+//         let position_core_clone = _core_arc.clone();
+//         spawn(async move {
+//             let mut interval = time::interval(Duration::from_secs(30));
+//             loop {
+//                 interval.tick().await;
+//                 {
+//                     let mut core = position_core_clone.lock().await;
+//                     core.update_position_rest_swap().await;
+//                 }
+//
+//             }
+//         });
+//     }
+// }
+//
+//
+//
+// async fn on_private_data(bot_arc_clone: Arc<Mutex<Core>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
+//     let mut trace_stack = TraceStack::default();
+//
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//     if data.channel == "wallet" {
+//         let account = standard::handle_info::HandleSwapInfo::handle_account_info(BybitSwap, data, run_symbol.clone());
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_equity(account).await;
+//         }
+//     } else if data.channel == "order" {
+//         trace_stack.on_before_format();
+//         let orders = standard::handle_info::HandleSwapInfo::handle_order(BybitSwap, data.clone(), ct_val.clone());
+//         trace_stack.on_after_format();
+//
+//         let mut order_infos:Vec<OrderInfo> = Vec::new();
+//         for order in orders.order {
+//             if order.status == "NULL" {
+//                 continue;
+//             }
+//             let order_info = OrderInfo {
+//                 symbol: "".to_string(),
+//                 amount: order.amount.abs(),
+//                 side: "".to_string(),
+//                 price: order.price,
+//                 client_id: order.custom_id,
+//                 filled_price: order.avg_price,
+//                 filled: order.deal_amount.abs(),
+//                 order_id: order.id,
+//                 local_time: 0,
+//                 create_time: 0,
+//                 status: order.status,
+//                 fee: Default::default(),
+//                 trace_stack: Default::default(),
+//             };
+//             order_infos.push(order_info);
+//         }
+//
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_order(order_infos, trace_stack);
+//         }
+//     } else if data.channel == "position" {
+//         let positions = standard::handle_info::HandleSwapInfo::handle_position(BybitSwap,data, ct_val.clone());
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_position(positions).await;
+//         }
+//     }
+// }
+//
+// async fn on_public_data(bot_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData, depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>) {
+//     let mut trace_stack = TraceStack::default();
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//     if data.channel == "orderbook" {
+//         let mut is_update = false;
+//         let data_type = data.data_type.clone();
+//         let label = data.label.clone();
+//         if data_type == "delta"  {
+//             is_update = true;
+//         }
+//         trace_stack.on_before_format();
+//         let mut depth_format: DepthParam = format_depth(BybitSwap, data);
+//         // 是增量更新
+//         if is_update {
+//             update_order_book(depth_asks, depth_bids, depth_format.depth_asks, depth_format.depth_bids);
+//         } else { // 全量
+//             depth_asks.clear();
+//             depth_asks.append(&mut depth_format.depth_asks);
+//             depth_bids.clear();
+//             depth_bids.append(&mut depth_format.depth_bids);
+//
+//         }
+//         let depth = make_special_depth(label.clone(), depth_asks, depth_bids, depth_format.t, depth_format.create_at);
+//         trace_stack.on_before_network(depth_format.create_at.clone());
+//         trace_stack.on_after_format();
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, label, trace_stack, depth).await;
+//     } else if data.channel == "trade" {
+//         let mut core = bot_arc_clone.lock().await;
+//         let str = data.label.clone();
+//         if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
+//             *max_buy = Decimal::ZERO;
+//             *min_sell = Decimal::ZERO;
+//             core.is_update.remove(str.as_str());
+//         }
+//         let trades: Vec<OriginalTradeBy> = serde_json::from_str(data.data.as_str()).unwrap();
+//         for trade in trades {
+//             if trade.p > *max_buy || *max_buy == Decimal::ZERO{
+//                 *max_buy = trade.p
+//             }
+//             if trade.p < *min_sell || *min_sell == Decimal::ZERO{
+//                 *min_sell = trade.p
+//             }
+//         }
+//         core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+//     }
+// }
+//
+// fn parse_btree_map_to_bybit_swap_login(exchange_params: BTreeMap<String, String>) -> BybitSwapLogin {
+//     BybitSwapLogin {
+//         api_key: exchange_params.get("access_key").unwrap().clone(),
+//         secret_key: exchange_params.get("secret_key").unwrap().clone(),
+//     }
+// }
+// fn update_order_book(depth_asks: &mut Vec<MarketOrder>, depth_bids: &mut Vec<MarketOrder>, asks : Vec<MarketOrder>, bids: Vec<MarketOrder>) {
+//     for i in asks {
+//         let index_of_value = depth_asks.iter().position(|x| x.price == i.price);
+//         match index_of_value {
+//             Some(index) => {
+//                 if i.amount == Decimal::ZERO {
+//                     depth_asks.remove(index);
+//                 } else {
+//                     depth_asks[index].amount = i.amount.clone();
+//                 }
+//             },
+//             None => {
+//                 depth_asks.push(i.clone());
+//             },
+//         }
+//     }
+//     for i in bids {
+//         let index_of_value = depth_bids.iter().position(|x| x.price == i.price);
+//         match index_of_value {
+//             Some(index) => {
+//                 if i.amount == Decimal::ZERO {
+//                     depth_bids.remove(index);
+//                 } else {
+//                     depth_bids[index].amount = i.amount.clone();
+//                 }
+//             },
+//             None => {
+//                 depth_bids.push(i.clone());
+//             },
+//         }
+//     }
+//     depth_asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap_or(Ordering::Equal));
+//     depth_bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap_or(Ordering::Equal));
+//
+//     // 限制总长度100
+//     depth_asks.truncate(100);
+//     depth_bids.truncate(100);
+// }

+ 39 - 39
strategy/src/exchange_disguise.rs

@@ -5,14 +5,14 @@ use rust_decimal::Decimal;
 use tokio::sync::Mutex;
 use global::trace_stack::TraceStack;
 use standard::SpecialDepth;
-use crate::binance_spot::reference_binance_spot_run;
+// use crate::binance_spot::reference_binance_spot_run;
 use crate::binance_usdt_swap::reference_binance_swap_run;
-use crate::bitget_spot::bitget_spot_run;
-use crate::bybit_usdt_swap::bybit_swap_run;
+// use crate::bitget_spot::bitget_spot_run;
+// use crate::bybit_usdt_swap::bybit_swap_run;
 use crate::gate_swap::gate_swap_run;
-use crate::kucoin_spot::kucoin_spot_run;
-use crate::kucoin_swap::kucoin_swap_run;
-use crate::okx_usdt_swap::okex_swap_run;
+// use crate::kucoin_spot::kucoin_spot_run;
+// use crate::kucoin_swap::kucoin_swap_run;
+// use crate::okx_usdt_swap::okex_swap_run;
 use crate::core::Core;
 
 // 交易交易所启动
@@ -27,18 +27,18 @@ pub async fn run_transactional_exchange(bool_v1 :Arc<AtomicBool>,
         "gate_usdt_swap" => {
             gate_swap_run(bool_v1, true, core_arc, name, symbols, is_colo, exchange_params).await;
         }
-        "kucoin_usdt_swap" => {
-            kucoin_swap_run(bool_v1, true, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "okex_usdt_swap" => {
-            okex_swap_run(bool_v1,true, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "bitget_spot" => {
-            bitget_spot_run(bool_v1,true, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "bybit_usdt_swap" => {
-            bybit_swap_run(bool_v1,true, core_arc, name, symbols, is_colo, exchange_params).await;
-        }
+        // "kucoin_usdt_swap" => {
+        //     kucoin_swap_run(bool_v1, true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "okex_usdt_swap" => {
+        //     okex_swap_run(bool_v1,true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "bitget_spot" => {
+        //     bitget_spot_run(bool_v1,true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "bybit_usdt_swap" => {
+        //     bybit_swap_run(bool_v1,true, core_arc, name, symbols, is_colo, exchange_params).await;
+        // }
         _ => {
             let msg = format!("不支持的交易交易所:{}", exchange_name);
             panic!("{}", msg);
@@ -58,27 +58,27 @@ pub async fn run_reference_exchange(bool_v1: Arc<AtomicBool>,
         "binance_usdt_swap" => {
             reference_binance_swap_run(bool_v1, core_arc, name, symbols, is_colo, exchange_params).await;
         },
-        "binance_spot" => {
-            reference_binance_spot_run(bool_v1, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "gate_usdt_swap" => {
-            gate_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "okex_usdt_swap" => {
-            okex_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "kucoin_usdt_swap" => {
-            kucoin_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "kucoin_spot" => {
-            kucoin_spot_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "bitget_spot" => {
-            bitget_spot_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
-        "bybit_usdt_swap" => {
-            bybit_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
-        },
+        // "binance_spot" => {
+        //     reference_binance_spot_run(bool_v1, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "gate_usdt_swap" => {
+        //     gate_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "okex_usdt_swap" => {
+        //     okex_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "kucoin_usdt_swap" => {
+        //     kucoin_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "kucoin_spot" => {
+        //     kucoin_spot_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "bitget_spot" => {
+        //     bitget_spot_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
+        // "bybit_usdt_swap" => {
+        //     bybit_swap_run(bool_v1, false, core_arc, name, symbols, is_colo, exchange_params).await;
+        // },
         _ => {
             let msg = format!("不支持的参考交易所:{}", exchange_name);
             panic!("{}", msg);

+ 0 - 0
strategy/src/gp_predictor.rs


+ 162 - 162
strategy/src/kucoin_spot.rs

@@ -1,162 +1,162 @@
-use std::collections::BTreeMap;
-use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
-use futures_util::StreamExt;
-use rust_decimal::Decimal;
-use tokio::sync::Mutex;
-use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
-use exchanges::response_base::ResponseData;
-use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::KucoinSpot;
-use crate::exchange_disguise::on_special_depth;
-use crate::model::OriginalTradeGa;
-use crate::core::Core;
-
-// 1交易、0参考 kucoin 现货 启动
-pub async fn kucoin_spot_run(bool_v1: Arc<AtomicBool>,
-                             _is_trade: bool,
-                             core_arc: Arc<Mutex<Core>>,
-                             name: String,
-                             symbols: Vec<String>,
-                             is_colo: bool,
-                             _exchange_params: BTreeMap<String, String>) {
-    let mut symbol_arr = Vec::new();
-    for symbol in symbols {
-        let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot, symbol.as_str());
-        let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
-        symbol_arr.push(new_symbol);
-    }
-
-    let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
-    let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-
-    let mut ws = KucoinSpotWs::new_label(name.clone(), is_colo, None, KucoinSpotWsType::Public).await;
-    ws.set_symbols(symbol_arr);
-    ws.set_subscribe(vec![
-        KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50,
-        // KucoinSpotSubscribeType::PuMarketTicker,         // python说:订阅 ticker来的很慢
-        KucoinSpotSubscribeType::PuMarketMatch,
-    ]);
-
-    // 开启ws
-    let write_tx_am = Arc::new(Mutex::new(write_tx));
-    tokio::spawn(async move {
-        //链接
-        let bool_v3_clone = Arc::clone(&bool_v1);
-        ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    });
-    //读取
-    // let bool_v1_clone = Arc::clone(&bool_v1);
-    tokio::spawn(async move {
-        let bot_arc_clone = Arc::clone(&core_arc);
-        // trade
-        let mut update_flag_u = Decimal::ZERO;
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-        let multiplier = Decimal::ONE;
-        // let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
-        // let run_symbol = symbols.clone()[0].clone();
-
-        loop {
-            if let Some(data) = read_rx.next().await {
-                on_kucoin_spot_data(bot_arc_clone.clone(),
-                                    &mut update_flag_u,
-                                    multiplier,
-                                    &mut max_buy,
-                                    &mut min_sell,
-                                    data).await;
-            }
-        }
-    });
-}
-
-async fn on_kucoin_spot_data(bot_arc_clone: Arc<Mutex<Core>>,
-                             update_flag_u: &mut Decimal,
-                             _multiplier: Decimal,
-                             max_buy: &mut Decimal,
-                             min_sell: &mut Decimal,
-                             data: ResponseData) {
-    let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-
-    if data.channel == "level2" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot, data.clone());
-        trace_stack.on_before_network(special_depth.create_at.clone());
-        trace_stack.on_after_format();
-
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
-    } else if data.channel == "trade.ticker" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data.clone());
-        trace_stack.on_before_network(special_depth.create_at.clone());
-        trace_stack.on_after_format();
-
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
-    }  else if data.channel == "trade.l3match" {
-        let mut core = bot_arc_clone.lock().await;
-        let str = data.label.clone();
-        if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
-            *max_buy = Decimal::ZERO;
-            *min_sell = Decimal::ZERO;
-            core.is_update.remove(str.as_str());
-        }
-        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
-        if trade.price > *max_buy || *max_buy == Decimal::ZERO {
-            *max_buy = trade.price
-        }
-        if trade.price < *min_sell || *min_sell == Decimal::ZERO {
-            *min_sell = trade.price
-        }
-        core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-    } else if data.channel == "availableBalance.change" {
-        // 取消原有推送解析,因为推送的信息不准确
-        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
-        // {
-        //     let mut core = bot_arc_clone.lock().await;
-        //     core.update_equity(account);
-        // }
-    } else if data.channel == "symbolOrderChange" {
-        // trace_stack.on_before_format();
-        // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
-        // trace_stack.on_after_format();
-        // let mut order_infos:Vec<OrderInfo> = Vec::new();
-        // for order in orders.order {
-        //     if order.status == "NULL" {
-        //         continue;
-        //     }
-        //     let order_info = OrderInfo {
-        //         symbol: "".to_string(),
-        //         amount: order.amount.abs(),
-        //         side: "".to_string(),
-        //         price: order.price,
-        //         client_id: order.custom_id,
-        //         filled_price: order.avg_price,
-        //         filled: order.deal_amount.abs(),
-        //         order_id: order.id,
-        //         local_time: 0,
-        //         create_time: 0,
-        //         status: order.status,
-        //         fee: Default::default(),
-        //         trace_stack: Default::default(),
-        //     };
-        //     order_infos.push(order_info);
-        // }
-        //
-        // {
-        //     let mut core = bot_arc_clone.lock().await;
-        //     core.update_order(order_infos, trace_stack);
-        // }
-    } else if data.channel == "position.change" {
-        // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
-        // {
-        //     let mut core = bot_arc_clone.lock().await;
-        //     core.update_position(positions);
-        // }
-    }
-}
+// use std::collections::BTreeMap;
+// use std::sync::Arc;
+// use std::sync::atomic::AtomicBool;
+// use futures_util::StreamExt;
+// use rust_decimal::Decimal;
+// use tokio::sync::Mutex;
+// use exchanges::kucoin_spot_ws::{KucoinSpotSubscribeType, KucoinSpotWs, KucoinSpotWsType};
+// use exchanges::response_base::ResponseData;
+// use global::trace_stack::TraceStack;
+// use standard::exchange::ExchangeEnum::KucoinSpot;
+// use crate::exchange_disguise::on_special_depth;
+// use crate::model::OriginalTradeGa;
+// use crate::core::Core;
+//
+// // 1交易、0参考 kucoin 现货 启动
+// pub async fn kucoin_spot_run(bool_v1: Arc<AtomicBool>,
+//                              _is_trade: bool,
+//                              core_arc: Arc<Mutex<Core>>,
+//                              name: String,
+//                              symbols: Vec<String>,
+//                              is_colo: bool,
+//                              _exchange_params: BTreeMap<String, String>) {
+//     let mut symbol_arr = Vec::new();
+//     for symbol in symbols {
+//         let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSpot, symbol.as_str());
+//         let new_symbol = symbol_mapper.replace("_", "-").to_uppercase();
+//         symbol_arr.push(new_symbol);
+//     }
+//
+//     let (write_tx, write_rx) = futures_channel::mpsc::unbounded();
+//     let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+//
+//     let mut ws = KucoinSpotWs::new_label(name.clone(), is_colo, None, KucoinSpotWsType::Public).await;
+//     ws.set_symbols(symbol_arr);
+//     ws.set_subscribe(vec![
+//         KucoinSpotSubscribeType::PuSpotMarketLevel2Depth50,
+//         // KucoinSpotSubscribeType::PuMarketTicker,         // python说:订阅 ticker来的很慢
+//         KucoinSpotSubscribeType::PuMarketMatch,
+//     ]);
+//
+//     // 开启ws
+//     let write_tx_am = Arc::new(Mutex::new(write_tx));
+//     tokio::spawn(async move {
+//         //链接
+//         let bool_v3_clone = Arc::clone(&bool_v1);
+//         ws.ws_connect_async(bool_v3_clone, &write_tx_am, write_rx, read_tx).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//     });
+//     //读取
+//     // let bool_v1_clone = Arc::clone(&bool_v1);
+//     tokio::spawn(async move {
+//         let bot_arc_clone = Arc::clone(&core_arc);
+//         // trade
+//         let mut update_flag_u = Decimal::ZERO;
+//         let mut max_buy = Decimal::ZERO;
+//         let mut min_sell = Decimal::ZERO;
+//         let multiplier = Decimal::ONE;
+//         // let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+//         // let run_symbol = symbols.clone()[0].clone();
+//
+//         loop {
+//             if let Some(data) = read_rx.next().await {
+//                 on_kucoin_spot_data(bot_arc_clone.clone(),
+//                                     &mut update_flag_u,
+//                                     multiplier,
+//                                     &mut max_buy,
+//                                     &mut min_sell,
+//                                     data).await;
+//             }
+//         }
+//     });
+// }
+//
+// async fn on_kucoin_spot_data(bot_arc_clone: Arc<Mutex<Core>>,
+//                              update_flag_u: &mut Decimal,
+//                              _multiplier: Decimal,
+//                              max_buy: &mut Decimal,
+//                              min_sell: &mut Decimal,
+//                              data: ResponseData) {
+//     let mut trace_stack = TraceStack::default();
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//
+//     if data.channel == "level2" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSpot, data.clone());
+//         trace_stack.on_before_network(special_depth.create_at.clone());
+//         trace_stack.on_after_format();
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+//     } else if data.channel == "trade.ticker" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSpot, data.clone());
+//         trace_stack.on_before_network(special_depth.create_at.clone());
+//         trace_stack.on_after_format();
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+//     }  else if data.channel == "trade.l3match" {
+//         let mut core = bot_arc_clone.lock().await;
+//         let str = data.label.clone();
+//         if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
+//             *max_buy = Decimal::ZERO;
+//             *min_sell = Decimal::ZERO;
+//             core.is_update.remove(str.as_str());
+//         }
+//         let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+//         if trade.price > *max_buy || *max_buy == Decimal::ZERO {
+//             *max_buy = trade.price
+//         }
+//         if trade.price < *min_sell || *min_sell == Decimal::ZERO {
+//             *min_sell = trade.price
+//         }
+//         core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+//     } else if data.channel == "availableBalance.change" {
+//         // 取消原有推送解析,因为推送的信息不准确
+//         // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+//         // {
+//         //     let mut core = bot_arc_clone.lock().await;
+//         //     core.update_equity(account);
+//         // }
+//     } else if data.channel == "symbolOrderChange" {
+//         // trace_stack.on_before_format();
+//         // let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+//         // trace_stack.on_after_format();
+//         // let mut order_infos:Vec<OrderInfo> = Vec::new();
+//         // for order in orders.order {
+//         //     if order.status == "NULL" {
+//         //         continue;
+//         //     }
+//         //     let order_info = OrderInfo {
+//         //         symbol: "".to_string(),
+//         //         amount: order.amount.abs(),
+//         //         side: "".to_string(),
+//         //         price: order.price,
+//         //         client_id: order.custom_id,
+//         //         filled_price: order.avg_price,
+//         //         filled: order.deal_amount.abs(),
+//         //         order_id: order.id,
+//         //         local_time: 0,
+//         //         create_time: 0,
+//         //         status: order.status,
+//         //         fee: Default::default(),
+//         //         trace_stack: Default::default(),
+//         //     };
+//         //     order_infos.push(order_info);
+//         // }
+//         //
+//         // {
+//         //     let mut core = bot_arc_clone.lock().await;
+//         //     core.update_order(order_infos, trace_stack);
+//         // }
+//     } else if data.channel == "position.change" {
+//         // let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+//         // {
+//         //     let mut core = bot_arc_clone.lock().await;
+//         //     core.update_position(positions);
+//         // }
+//     }
+// }

+ 206 - 206
strategy/src/kucoin_swap.rs

@@ -1,206 +1,206 @@
-use std::collections::{BTreeMap};
-use std::sync::Arc;
-use std::sync::atomic::{AtomicBool};
-use std::time::Duration;
-use futures_util::StreamExt;
-
-use rust_decimal::Decimal;
-use tokio_tungstenite::tungstenite::Message;
-use tokio::spawn;
-use tokio::sync::Mutex;
-use tokio::time::sleep;
-
-use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
-use exchanges::response_base::ResponseData;
-use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::KucoinSwap;
-use crate::exchange_disguise::on_special_depth;
-
-use crate::model::{OrderInfo, OriginalTradeGa};
-use crate::core::Core;
-
-// 1交易、0参考 kucoin 合约 启动
-pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>,
-                             is_trade: bool,
-                             core_arc: Arc<Mutex<Core>>,
-                             name: String,
-                             symbols: Vec<String>,
-                             is_colo: bool,
-                             exchange_params: BTreeMap<String, String>) {
-    let symbols_clone = symbols.clone();
-    let mut symbol_arr = Vec::new();
-    for symbol in symbols_clone {
-        let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSwap,symbol.as_str());
-        let new_symbol = symbol_mapper.replace("_", "").to_uppercase() + "M";
-        symbol_arr.push(new_symbol);
-    }
-
-    // 新增定期获取余额的协程
-    let account_core_arc = core_arc.clone();
-    spawn(async move {
-        loop {
-            // 每30秒重新获取一次
-            sleep(Duration::from_secs(30)).await;
-
-            {
-                let mut core = account_core_arc.lock().await;
-                core.update_equity_rest_swap().await;
-            }
-        }
-    });
-
-    spawn(async move {
-        //创建读写通道
-        let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
-        let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
-
-        spawn( async move {
-            let mut kucoin_exc;
-            //模拟业务场景 开启链接
-            let bool_v1_clone = Arc::clone(&bool_v1);
-            let write_tx_am = Arc::new(Mutex::new(write_tx));
-
-            // 交易
-            if is_trade {
-                let login_params = parse_btree_map_to_kucoin_swap_login(exchange_params);
-                kucoin_exc = KucoinSwapWs::new_label(name.clone(), is_colo, Option::from(login_params), KucoinSwapWsType::Private).await;
-                kucoin_exc.set_subscribe(vec![
-                    KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
-                    // KucoinSwapSubscribeType::PuContractMarkettickerV2,
-                    // KucoinSwapSubscribeType::PrContractAccountWallet,
-                    KucoinSwapSubscribeType::PrContractPosition,
-                    KucoinSwapSubscribeType::PrContractMarketTradeOrders
-                ]);
-            } else { // 参考
-                kucoin_exc = KucoinSwapWs::new_label(name.clone(), is_colo, None, KucoinSwapWsType::Public).await;
-                kucoin_exc.set_subscribe(vec![
-                    KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
-                    // python注释掉了
-                    // KucoinSwapSubscribeType::PuContractMarkettickerV2,
-                    KucoinSwapSubscribeType::PuContractMarketExecution
-                ]);
-            }
-
-            kucoin_exc.set_symbols(symbol_arr);
-            kucoin_exc.ws_connect_async(bool_v1_clone, &write_tx_am, write_rx, read_tx).await.unwrap();
-        });
-
-        // 数据处理协程
-        spawn(async move {
-            let bot_arc_clone = Arc::clone(&core_arc);
-
-            // ticker
-            let mut update_flag_u = Decimal::ZERO;
-            let mut max_buy = Decimal::ZERO;
-            let mut min_sell = Decimal::ZERO;
-            let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
-
-            loop {
-                if let Some(data) = read_rx.next().await {
-                    on_data(bot_arc_clone.clone(),
-                            &mut update_flag_u,
-                            multiplier,
-                            &mut max_buy,
-                            &mut min_sell,
-                            data).await;
-                }
-            }
-        });
-    });
-}
-
-async fn on_data(bot_arc_clone: Arc<Mutex<Core>>,
-                 update_flag_u: &mut Decimal,
-                 multiplier: Decimal,
-                 max_buy: &mut Decimal,
-                 min_sell: &mut Decimal,
-                 data: ResponseData) {
-    let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-    if data.channel == "level2" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data.clone());
-        trace_stack.on_before_network(special_depth.create_at.clone());
-        trace_stack.on_after_format();
-
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await
-    } else if data.channel == "tickerV2" {
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data.clone());
-        trace_stack.on_before_network(special_depth.create_at.clone());
-
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await
-    } else if data.channel == "availableBalance.change" {
-        // 取消原有推送解析,因为推送的信息不准确
-        // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
-        // {
-        //     let mut core = bot_arc_clone.lock().await;
-        //     core.update_equity(account);
-        // }
-    } else if data.channel == "symbolOrderChange" {
-        trace_stack.on_before_format();
-        let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
-        trace_stack.on_after_format();
-        let mut order_infos:Vec<OrderInfo> = Vec::new();
-        for order in orders.order {
-            if order.status == "NULL" {
-                continue;
-            }
-            let order_info = OrderInfo {
-                symbol: "".to_string(),
-                amount: order.amount.abs(),
-                side: "".to_string(),
-                price: order.price,
-                client_id: order.custom_id,
-                filled_price: order.avg_price,
-                filled: order.deal_amount.abs(),
-                order_id: order.id,
-                local_time: 0,
-                create_time: 0,
-                status: order.status,
-                fee: Default::default(),
-                trace_stack: Default::default(),
-            };
-            order_infos.push(order_info);
-        }
-
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_order(order_infos, trace_stack);
-        }
-    } else if data.channel == "position.change" {
-        let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_position(positions).await;
-        }
-    } else if data.channel == "match" {
-        let mut core = bot_arc_clone.lock().await;
-        let str = data.label.clone();
-        if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
-            *max_buy = Decimal::ZERO;
-            *min_sell = Decimal::ZERO;
-            core.is_update.remove(str.as_str());
-        }
-        let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
-        if trade.price > *max_buy || *max_buy == Decimal::ZERO {
-            *max_buy = trade.price
-        }
-        if trade.price < *min_sell || *min_sell == Decimal::ZERO {
-            *min_sell = trade.price
-        }
-        core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-    }
-}
-
-fn parse_btree_map_to_kucoin_swap_login(exchange_params: BTreeMap<String, String>) -> KucoinSwapLogin {
-    KucoinSwapLogin {
-        access_key: exchange_params.get("access_key").unwrap().clone(),
-        secret_key: exchange_params.get("secret_key").unwrap().clone(),
-        pass_key: exchange_params.get("pass_key").unwrap().clone(),
-    }
-}
+// use std::collections::{BTreeMap};
+// use std::sync::Arc;
+// use std::sync::atomic::{AtomicBool};
+// use std::time::Duration;
+// use futures_util::StreamExt;
+//
+// use rust_decimal::Decimal;
+// use tokio_tungstenite::tungstenite::Message;
+// use tokio::spawn;
+// use tokio::sync::Mutex;
+// use tokio::time::sleep;
+//
+// use exchanges::kucoin_swap_ws::{KucoinSwapLogin, KucoinSwapSubscribeType, KucoinSwapWs, KucoinSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use global::trace_stack::TraceStack;
+// use standard::exchange::ExchangeEnum::KucoinSwap;
+// use crate::exchange_disguise::on_special_depth;
+//
+// use crate::model::{OrderInfo, OriginalTradeGa};
+// use crate::core::Core;
+//
+// // 1交易、0参考 kucoin 合约 启动
+// pub async fn kucoin_swap_run(bool_v1 :Arc<AtomicBool>,
+//                              is_trade: bool,
+//                              core_arc: Arc<Mutex<Core>>,
+//                              name: String,
+//                              symbols: Vec<String>,
+//                              is_colo: bool,
+//                              exchange_params: BTreeMap<String, String>) {
+//     let symbols_clone = symbols.clone();
+//     let mut symbol_arr = Vec::new();
+//     for symbol in symbols_clone {
+//         let symbol_mapper = standard::utils::symbol_enter_mapper(KucoinSwap,symbol.as_str());
+//         let new_symbol = symbol_mapper.replace("_", "").to_uppercase() + "M";
+//         symbol_arr.push(new_symbol);
+//     }
+//
+//     // 新增定期获取余额的协程
+//     let account_core_arc = core_arc.clone();
+//     spawn(async move {
+//         loop {
+//             // 每30秒重新获取一次
+//             sleep(Duration::from_secs(30)).await;
+//
+//             {
+//                 let mut core = account_core_arc.lock().await;
+//                 core.update_equity_rest_swap().await;
+//             }
+//         }
+//     });
+//
+//     spawn(async move {
+//         //创建读写通道
+//         let (write_tx, write_rx) = futures_channel::mpsc::unbounded::<Message>();
+//         let (read_tx, mut read_rx) = futures_channel::mpsc::unbounded();
+//
+//         spawn( async move {
+//             let mut kucoin_exc;
+//             //模拟业务场景 开启链接
+//             let bool_v1_clone = Arc::clone(&bool_v1);
+//             let write_tx_am = Arc::new(Mutex::new(write_tx));
+//
+//             // 交易
+//             if is_trade {
+//                 let login_params = parse_btree_map_to_kucoin_swap_login(exchange_params);
+//                 kucoin_exc = KucoinSwapWs::new_label(name.clone(), is_colo, Option::from(login_params), KucoinSwapWsType::Private).await;
+//                 kucoin_exc.set_subscribe(vec![
+//                     KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
+//                     // KucoinSwapSubscribeType::PuContractMarkettickerV2,
+//                     // KucoinSwapSubscribeType::PrContractAccountWallet,
+//                     KucoinSwapSubscribeType::PrContractPosition,
+//                     KucoinSwapSubscribeType::PrContractMarketTradeOrders
+//                 ]);
+//             } else { // 参考
+//                 kucoin_exc = KucoinSwapWs::new_label(name.clone(), is_colo, None, KucoinSwapWsType::Public).await;
+//                 kucoin_exc.set_subscribe(vec![
+//                     KucoinSwapSubscribeType::PuContractMarketLevel2Depth50,
+//                     // python注释掉了
+//                     // KucoinSwapSubscribeType::PuContractMarkettickerV2,
+//                     KucoinSwapSubscribeType::PuContractMarketExecution
+//                 ]);
+//             }
+//
+//             kucoin_exc.set_symbols(symbol_arr);
+//             kucoin_exc.ws_connect_async(bool_v1_clone, &write_tx_am, write_rx, read_tx).await.unwrap();
+//         });
+//
+//         // 数据处理协程
+//         spawn(async move {
+//             let bot_arc_clone = Arc::clone(&core_arc);
+//
+//             // ticker
+//             let mut update_flag_u = Decimal::ZERO;
+//             let mut max_buy = Decimal::ZERO;
+//             let mut min_sell = Decimal::ZERO;
+//             let multiplier = bot_arc_clone.lock().await.platform_rest.get_self_market().ct_val;
+//
+//             loop {
+//                 if let Some(data) = read_rx.next().await {
+//                     on_data(bot_arc_clone.clone(),
+//                             &mut update_flag_u,
+//                             multiplier,
+//                             &mut max_buy,
+//                             &mut min_sell,
+//                             data).await;
+//                 }
+//             }
+//         });
+//     });
+// }
+//
+// async fn on_data(bot_arc_clone: Arc<Mutex<Core>>,
+//                  update_flag_u: &mut Decimal,
+//                  multiplier: Decimal,
+//                  max_buy: &mut Decimal,
+//                  min_sell: &mut Decimal,
+//                  data: ResponseData) {
+//     let mut trace_stack = TraceStack::default();
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//     if data.channel == "level2" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(KucoinSwap,data.clone());
+//         trace_stack.on_before_network(special_depth.create_at.clone());
+//         trace_stack.on_after_format();
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await
+//     } else if data.channel == "tickerV2" {
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_ticker(KucoinSwap, data.clone());
+//         trace_stack.on_before_network(special_depth.create_at.clone());
+//
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await
+//     } else if data.channel == "availableBalance.change" {
+//         // 取消原有推送解析,因为推送的信息不准确
+//         // let account = standard::handle_info::HandleSwapInfo::handle_account_info(KucoinSwap, data, run_symbol.clone());
+//         // {
+//         //     let mut core = bot_arc_clone.lock().await;
+//         //     core.update_equity(account);
+//         // }
+//     } else if data.channel == "symbolOrderChange" {
+//         trace_stack.on_before_format();
+//         let orders = standard::handle_info::HandleSwapInfo::handle_order(KucoinSwap, data.clone(), multiplier);
+//         trace_stack.on_after_format();
+//         let mut order_infos:Vec<OrderInfo> = Vec::new();
+//         for order in orders.order {
+//             if order.status == "NULL" {
+//                 continue;
+//             }
+//             let order_info = OrderInfo {
+//                 symbol: "".to_string(),
+//                 amount: order.amount.abs(),
+//                 side: "".to_string(),
+//                 price: order.price,
+//                 client_id: order.custom_id,
+//                 filled_price: order.avg_price,
+//                 filled: order.deal_amount.abs(),
+//                 order_id: order.id,
+//                 local_time: 0,
+//                 create_time: 0,
+//                 status: order.status,
+//                 fee: Default::default(),
+//                 trace_stack: Default::default(),
+//             };
+//             order_infos.push(order_info);
+//         }
+//
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_order(order_infos, trace_stack);
+//         }
+//     } else if data.channel == "position.change" {
+//         let positions = standard::handle_info::HandleSwapInfo::handle_position(KucoinSwap,data, multiplier);
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_position(positions).await;
+//         }
+//     } else if data.channel == "match" {
+//         let mut core = bot_arc_clone.lock().await;
+//         let str = data.label.clone();
+//         if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap() {
+//             *max_buy = Decimal::ZERO;
+//             *min_sell = Decimal::ZERO;
+//             core.is_update.remove(str.as_str());
+//         }
+//         let trade: OriginalTradeGa = serde_json::from_str(data.data.as_str()).unwrap();
+//         if trade.price > *max_buy || *max_buy == Decimal::ZERO {
+//             *max_buy = trade.price
+//         }
+//         if trade.price < *min_sell || *min_sell == Decimal::ZERO {
+//             *min_sell = trade.price
+//         }
+//         core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+//     }
+// }
+//
+// fn parse_btree_map_to_kucoin_swap_login(exchange_params: BTreeMap<String, String>) -> KucoinSwapLogin {
+//     KucoinSwapLogin {
+//         access_key: exchange_params.get("access_key").unwrap().clone(),
+//         secret_key: exchange_params.get("secret_key").unwrap().clone(),
+//         pass_key: exchange_params.get("pass_key").unwrap().clone(),
+//     }
+// }

+ 0 - 1
strategy/src/lib.rs

@@ -4,7 +4,6 @@ mod strategy;
 mod predictor;
 mod utils;
 pub mod exchange_disguise;
-mod gp_predictor;
 mod binance_usdt_swap;
 mod binance_spot;
 mod gate_swap;

+ 208 - 208
strategy/src/okx_usdt_swap.rs

@@ -1,208 +1,208 @@
-use std::collections::BTreeMap;
-use std::sync::Arc;
-use std::sync::atomic::AtomicBool;
-use futures_util::StreamExt;
-use rust_decimal::Decimal;
-use tokio::sync::Mutex;
-use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
-use exchanges::response_base::ResponseData;
-use global::trace_stack::TraceStack;
-use standard::exchange::ExchangeEnum::OkxSwap;
-use crate::exchange_disguise::on_special_depth;
-use crate::model::{OrderInfo, OriginalTradeOK};
-use crate::core::Core;
-
-pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
-                           is_trade: bool,
-                           _core_arc: Arc<Mutex<Core>>,
-                           name: String,
-                           symbols: Vec<String>,
-                           is_colo: bool,
-                           exchange_params: BTreeMap<String, String>) {
-    // 启动公共频道
-    let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
-    let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
-
-    let mut ws_public = OkxSwapWs::new_label(name.clone(), is_colo, None, OkxSwapWsType::Public);
-    ws_public.set_symbols(symbols.clone());
-    if is_trade {
-        ws_public.set_subscribe(vec![
-            OkxSwapSubscribeType::PuBooks5
-        ])
-    } else {
-        ws_public.set_subscribe(vec![
-            OkxSwapSubscribeType::PuBooks50L2tbt
-        ])
-    }
-    // 挂起公共ws
-    let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
-    let bool_clone_public = Arc::clone(&bool_v1);
-    tokio::spawn(async move {
-        ws_public.ws_connect_async(bool_clone_public,
-                                   &write_tx_am_public,
-                                   write_rx_public,
-                                   read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-    });
-    // 消费数据
-    let bot_arc_clone = _core_arc.clone();
-    // 接收public数据
-    tokio::spawn(async move {
-        // ticker
-        let mut update_flag_u = Decimal::ZERO;
-        let mut max_buy = Decimal::ZERO;
-        let mut min_sell = Decimal::ZERO;
-
-        loop {
-            if let Some(public_data) = read_rx_public.next().await {
-                on_public_data(bot_arc_clone.clone(),
-                               &mut update_flag_u,
-                               &mut max_buy,
-                               &mut min_sell,
-                               public_data).await;
-            }
-        }
-    });
-
-    // 交易交易所需要启动私有ws
-    if is_trade {
-        let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
-        let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
-        let auth = Some(parse_btree_map_to_okx_swap_login(exchange_params));
-
-        let mut ws_private = OkxSwapWs::new_label(name.clone(), is_colo, auth, OkxSwapWsType::Private);
-        ws_private.set_symbols(symbols.clone());
-        ws_private.set_subscribe(vec![
-            OkxSwapSubscribeType::PrBalanceAndPosition,
-            OkxSwapSubscribeType::PrAccount("USDT".to_string()),
-            OkxSwapSubscribeType::PrOrders
-        ]);
-
-
-        // 挂起私有ws
-        let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
-        let bool_clone_private = Arc::clone(&bool_v1);
-        tokio::spawn(async move {
-            ws_private.ws_connect_async(bool_clone_private,
-                                &write_tx_am_private,
-                                write_rx_private,
-                                read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
-        });
-
-        // 消费数据
-        let bot_arc_clone = _core_arc.clone();
-        // 接收private信息
-        tokio::spawn(async move {
-            let ct_val = _core_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
-            let run_symbol = symbols.clone()[0].clone();
-            loop {
-                if let Some(private_data) = read_rx_private.next().await {
-                    on_private_data(bot_arc_clone.clone(),
-                                    ct_val,
-                                    private_data,
-                                    run_symbol.clone()).await;
-                }
-            }
-        });
-    }
-}
-
-async fn on_private_data(bot_arc_clone: Arc<Mutex<Core>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
-    let mut trace_stack = TraceStack::default();
-
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-    if data.channel == "orders" {
-        trace_stack.on_before_format();
-        let orders = standard::handle_info::HandleSwapInfo::handle_order(OkxSwap, data.clone(), ct_val);
-        trace_stack.on_after_format();
-        let mut order_infos:Vec<OrderInfo> = Vec::new();
-        for order in orders.order {
-            if order.status == "NULL" {
-                continue;
-            }
-            let order_info = OrderInfo {
-                symbol: "".to_string(),
-                amount: order.amount.abs(),
-                side: "".to_string(),
-                price: order.price,
-                client_id: order.custom_id,
-                filled_price: order.avg_price,
-                filled: order.deal_amount.abs(),
-                order_id: order.id,
-                local_time: 0,
-                create_time: 0,
-                status: order.status,
-                fee: Default::default(),
-                trace_stack: Default::default(),
-            };
-            order_infos.push(order_info);
-        }
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_order(order_infos, trace_stack);
-        }
-    } else if data.channel == "balance_and_position" {
-        let positions = standard::handle_info::HandleSwapInfo::handle_position(OkxSwap,data, ct_val);
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_position(positions).await;
-        }
-    } else if data.channel == "account" {
-        let account = standard::handle_info::HandleSwapInfo::handle_account_info(OkxSwap, data.clone(), run_symbol.clone());
-        {
-            let mut core = bot_arc_clone.lock().await;
-            core.update_equity(account).await;
-        }
-    }
-}
-
-async fn on_public_data(bot_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
-    let mut trace_stack = TraceStack::default();
-    trace_stack.on_after_network(data.time);
-    trace_stack.on_before_unlock_core();
-
-    if data.code != "200".to_string() {
-        return;
-    }
-    if data.channel == "tickers" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
-        trace_stack.on_after_format();
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
-    } else if data.channel == "trades" {
-        let mut core = bot_arc_clone.lock().await;
-        let str = data.label.clone();
-        if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
-            *max_buy = Decimal::ZERO;
-            *min_sell = Decimal::ZERO;
-            core.is_update.remove(str.as_str());
-        }
-        let trades: Vec<OriginalTradeOK> = serde_json::from_str(data.data.as_str()).unwrap();
-        for trade in trades {
-            if trade.px > *max_buy || *max_buy == Decimal::ZERO{
-                *max_buy = trade.px
-            }
-            if trade.px < *min_sell || *min_sell == Decimal::ZERO{
-                *min_sell = trade.px
-            }
-        }
-        core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
-    } else if data.channel == "books5" {
-        trace_stack.on_before_format();
-        let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
-        trace_stack.on_after_format();
-        on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
-    }
-}
-
-fn parse_btree_map_to_okx_swap_login(exchange_params: BTreeMap<String, String>) -> OkxSwapLogin {
-    OkxSwapLogin {
-        api_key: exchange_params.get("access_key").unwrap().clone(),
-        secret_key: exchange_params.get("secret_key").unwrap().clone(),
-        passphrase: exchange_params.get("pass_key").unwrap().clone(),
-    }
-}
+// use std::collections::BTreeMap;
+// use std::sync::Arc;
+// use std::sync::atomic::AtomicBool;
+// use futures_util::StreamExt;
+// use rust_decimal::Decimal;
+// use tokio::sync::Mutex;
+// use exchanges::okx_swap_ws::{OkxSwapLogin, OkxSwapSubscribeType, OkxSwapWs, OkxSwapWsType};
+// use exchanges::response_base::ResponseData;
+// use global::trace_stack::TraceStack;
+// use standard::exchange::ExchangeEnum::OkxSwap;
+// use crate::exchange_disguise::on_special_depth;
+// use crate::model::{OrderInfo, OriginalTradeOK};
+// use crate::core::Core;
+//
+// pub async fn okex_swap_run(bool_v1: Arc<AtomicBool>,
+//                            is_trade: bool,
+//                            _core_arc: Arc<Mutex<Core>>,
+//                            name: String,
+//                            symbols: Vec<String>,
+//                            is_colo: bool,
+//                            exchange_params: BTreeMap<String, String>) {
+//     // 启动公共频道
+//     let (write_tx_public, write_rx_public) = futures_channel::mpsc::unbounded();
+//     let (read_tx_public, mut read_rx_public) = futures_channel::mpsc::unbounded();
+//
+//     let mut ws_public = OkxSwapWs::new_label(name.clone(), is_colo, None, OkxSwapWsType::Public);
+//     ws_public.set_symbols(symbols.clone());
+//     if is_trade {
+//         ws_public.set_subscribe(vec![
+//             OkxSwapSubscribeType::PuBooks5
+//         ])
+//     } else {
+//         ws_public.set_subscribe(vec![
+//             OkxSwapSubscribeType::PuBooks50L2tbt
+//         ])
+//     }
+//     // 挂起公共ws
+//     let write_tx_am_public = Arc::new(Mutex::new(write_tx_public));
+//     let bool_clone_public = Arc::clone(&bool_v1);
+//     tokio::spawn(async move {
+//         ws_public.ws_connect_async(bool_clone_public,
+//                                    &write_tx_am_public,
+//                                    write_rx_public,
+//                                    read_tx_public).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//     });
+//     // 消费数据
+//     let bot_arc_clone = _core_arc.clone();
+//     // 接收public数据
+//     tokio::spawn(async move {
+//         // ticker
+//         let mut update_flag_u = Decimal::ZERO;
+//         let mut max_buy = Decimal::ZERO;
+//         let mut min_sell = Decimal::ZERO;
+//
+//         loop {
+//             if let Some(public_data) = read_rx_public.next().await {
+//                 on_public_data(bot_arc_clone.clone(),
+//                                &mut update_flag_u,
+//                                &mut max_buy,
+//                                &mut min_sell,
+//                                public_data).await;
+//             }
+//         }
+//     });
+//
+//     // 交易交易所需要启动私有ws
+//     if is_trade {
+//         let (write_tx_private, write_rx_private) = futures_channel::mpsc::unbounded();
+//         let (read_tx_private, mut read_rx_private) = futures_channel::mpsc::unbounded();
+//         let auth = Some(parse_btree_map_to_okx_swap_login(exchange_params));
+//
+//         let mut ws_private = OkxSwapWs::new_label(name.clone(), is_colo, auth, OkxSwapWsType::Private);
+//         ws_private.set_symbols(symbols.clone());
+//         ws_private.set_subscribe(vec![
+//             OkxSwapSubscribeType::PrBalanceAndPosition,
+//             OkxSwapSubscribeType::PrAccount("USDT".to_string()),
+//             OkxSwapSubscribeType::PrOrders
+//         ]);
+//
+//
+//         // 挂起私有ws
+//         let write_tx_am_private = Arc::new(Mutex::new(write_tx_private));
+//         let bool_clone_private = Arc::clone(&bool_v1);
+//         tokio::spawn(async move {
+//             ws_private.ws_connect_async(bool_clone_private,
+//                                 &write_tx_am_private,
+//                                 write_rx_private,
+//                                 read_tx_private).await.expect("链接失败(内部一个心跳线程应该已经关闭了)");
+//         });
+//
+//         // 消费数据
+//         let bot_arc_clone = _core_arc.clone();
+//         // 接收private信息
+//         tokio::spawn(async move {
+//             let ct_val = _core_arc.clone().lock().await.platform_rest.get_self_market().ct_val;
+//             let run_symbol = symbols.clone()[0].clone();
+//             loop {
+//                 if let Some(private_data) = read_rx_private.next().await {
+//                     on_private_data(bot_arc_clone.clone(),
+//                                     ct_val,
+//                                     private_data,
+//                                     run_symbol.clone()).await;
+//                 }
+//             }
+//         });
+//     }
+// }
+//
+// async fn on_private_data(bot_arc_clone: Arc<Mutex<Core>>, ct_val: Decimal, data: ResponseData, run_symbol: String) {
+//     let mut trace_stack = TraceStack::default();
+//
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//     if data.channel == "orders" {
+//         trace_stack.on_before_format();
+//         let orders = standard::handle_info::HandleSwapInfo::handle_order(OkxSwap, data.clone(), ct_val);
+//         trace_stack.on_after_format();
+//         let mut order_infos:Vec<OrderInfo> = Vec::new();
+//         for order in orders.order {
+//             if order.status == "NULL" {
+//                 continue;
+//             }
+//             let order_info = OrderInfo {
+//                 symbol: "".to_string(),
+//                 amount: order.amount.abs(),
+//                 side: "".to_string(),
+//                 price: order.price,
+//                 client_id: order.custom_id,
+//                 filled_price: order.avg_price,
+//                 filled: order.deal_amount.abs(),
+//                 order_id: order.id,
+//                 local_time: 0,
+//                 create_time: 0,
+//                 status: order.status,
+//                 fee: Default::default(),
+//                 trace_stack: Default::default(),
+//             };
+//             order_infos.push(order_info);
+//         }
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_order(order_infos, trace_stack);
+//         }
+//     } else if data.channel == "balance_and_position" {
+//         let positions = standard::handle_info::HandleSwapInfo::handle_position(OkxSwap,data, ct_val);
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_position(positions).await;
+//         }
+//     } else if data.channel == "account" {
+//         let account = standard::handle_info::HandleSwapInfo::handle_account_info(OkxSwap, data.clone(), run_symbol.clone());
+//         {
+//             let mut core = bot_arc_clone.lock().await;
+//             core.update_equity(account).await;
+//         }
+//     }
+// }
+//
+// async fn on_public_data(bot_arc_clone: Arc<Mutex<Core>>, update_flag_u: &mut Decimal, max_buy: &mut Decimal, min_sell: &mut Decimal, data: ResponseData) {
+//     let mut trace_stack = TraceStack::default();
+//     trace_stack.on_after_network(data.time);
+//     trace_stack.on_before_unlock_core();
+//
+//     if data.code != "200".to_string() {
+//         return;
+//     }
+//     if data.channel == "tickers" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
+//         trace_stack.on_after_format();
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+//     } else if data.channel == "trades" {
+//         let mut core = bot_arc_clone.lock().await;
+//         let str = data.label.clone();
+//         if core.is_update.contains_key(&data.label) && *core.is_update.get(str.as_str()).unwrap(){
+//             *max_buy = Decimal::ZERO;
+//             *min_sell = Decimal::ZERO;
+//             core.is_update.remove(str.as_str());
+//         }
+//         let trades: Vec<OriginalTradeOK> = serde_json::from_str(data.data.as_str()).unwrap();
+//         for trade in trades {
+//             if trade.px > *max_buy || *max_buy == Decimal::ZERO{
+//                 *max_buy = trade.px
+//             }
+//             if trade.px < *min_sell || *min_sell == Decimal::ZERO{
+//                 *min_sell = trade.px
+//             }
+//         }
+//         core.max_buy_min_sell_cache.insert(data.label, vec![*max_buy, *min_sell]);
+//     } else if data.channel == "books5" {
+//         trace_stack.on_before_format();
+//         let special_depth = standard::handle_info::HandleSwapInfo::handle_special_depth(OkxSwap, data.clone());
+//         trace_stack.on_after_format();
+//         on_special_depth(bot_arc_clone, update_flag_u, data.label, trace_stack, special_depth).await;
+//     }
+// }
+//
+// fn parse_btree_map_to_okx_swap_login(exchange_params: BTreeMap<String, String>) -> OkxSwapLogin {
+//     OkxSwapLogin {
+//         api_key: exchange_params.get("access_key").unwrap().clone(),
+//         secret_key: exchange_params.get("secret_key").unwrap().clone(),
+//         passphrase: exchange_params.get("pass_key").unwrap().clone(),
+//     }
+// }