Browse Source

转为gate测试。

skyffire 1 year ago
parent
commit
1390a7279a
2 changed files with 56 additions and 34 deletions
  1. 30 27
      kappa/data_processing.py
  2. 26 7
      kappa/ws_client.py

+ 30 - 27
kappa/data_processing.py

@@ -43,38 +43,39 @@ def get_tick_size_from_prices(ask_price, bid_price):
 def on_message_trade(_ws, message):
     global trade_snapshots
     json_message = json.loads(message)
-    trade = {
-        'price': float(json_message['data']['p']),
-        'qty': float(json_message['data']['q']),
-        'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
-        'side': 'sell' if json_message['data']['m'] else 'buy'
-    }
-    trade_snapshots.append(trade)
-    process_depth_data()
+    logger.debug(json_message)
+    # trade = {
+    #     'price': float(json_message['data']['p']),
+    #     'qty': float(json_message['data']['q']),
+    #     'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
+    #     'side': 'sell' if json_message['data']['m'] else 'buy'
+    # }
+    # trade_snapshots.append(trade)
+    # process_depth_data()
 
 
 def on_message_depth(_ws, message):
     global order_book_snapshots, spread_delta_snapshots
     json_message = json.loads(message)
-    bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
-    asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
-    timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
-    depth = {
-        'bids': bids,
-        'asks': asks,
-        'timestamp': timestamp
-    }
-    order_book_snapshots.append(depth)
-
-    # 求价差
-    ask_price = Decimal(str(asks[0][0]))
-    bid_price = Decimal(str(bids[0][0]))
-    tick_size = get_tick_size_from_prices(ask_price, bid_price)
-    spread = float(ask_price - bid_price)
-    spread_delta = int(spread / tick_size)
-    spread_delta_snapshots.append(spread_delta)
-
-    process_depth_data()
+    # bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
+    # asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
+    # timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
+    # depth = {
+    #     'bids': bids,
+    #     'asks': asks,
+    #     'timestamp': timestamp
+    # }
+    # order_book_snapshots.append(depth)
+    #
+    # # 求价差
+    # ask_price = Decimal(str(asks[0][0]))
+    # bid_price = Decimal(str(bids[0][0]))
+    # tick_size = get_tick_size_from_prices(ask_price, bid_price)
+    # spread = float(ask_price - bid_price)
+    # spread_delta = int(spread / tick_size)
+    # spread_delta_snapshots.append(spread_delta)
+    #
+    # process_depth_data()
 
 
 def calculate_phi(prices, k, S0):
@@ -217,6 +218,8 @@ def process_depth_data():
     if result.success:
         A_initial, k_initial = result.x
         # logger.info(f"Optimal k: {k_initial}, delta_max: {delta_max}")
+    else:
+        logger.error(result)
 
     # ========================== 计算 σ^2 ==========================
     sigma_squared = calculate_sigma_squared(S_values, order_book_timestamps)

+ 26 - 7
kappa/ws_client.py

@@ -1,3 +1,4 @@
+import json
 import traceback
 
 import websocket
@@ -7,9 +8,23 @@ from logger_config import logger
 from data_processing import on_message_depth, on_message_trade, stop_event
 
 # Binance WebSocket API URL
-SYMBOL = "zro" + "usdt"
-SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
-SOCKET_DEPTH = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@depth20@100ms"
+# 订阅信息
+SYMBOL = "zrousdt"
+SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt"
+SOCKET_DEPTH = "wss://fx-ws.gateio.ws/v4/ws/usdt"
+
+# 订阅消息格式
+SUBSCRIBE_TRADE = {
+    "id": 12312,  # 可以是任意的唯一 id
+    "method": "trades.subscribe",
+    "params": [SYMBOL]
+}
+
+SUBSCRIBE_DEPTH = {
+    "id": 12313,  # 可以是任意的唯一 id
+    "method": "depth.subscribe",
+    "params": [SYMBOL, 20, "0.1"]  # 20 是深度档位数量,"0.1" 是深度的精度
+}
 
 
 def on_error(_ws, _error):
@@ -18,13 +33,17 @@ def on_error(_ws, _error):
     # raise error                 # 重新抛出错误
 
 
-def on_open(_ws):
-    logger.info("### binance_ws opened ###")
+def on_open_trade(ws):
+    ws.send(json.dumps(SUBSCRIBE_TRADE))
+
+
+def on_open_depth(ws):
+    ws.send(json.dumps(SUBSCRIBE_DEPTH))
 
 
 # Create a WebSocket app
-ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
-ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open)
+ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open_trade)
+ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open_depth)
 
 # 定义要传递给 run_forever 的参数
 http_proxy_host = "127.0.0.1"