Explorar el Código

gate数据标准化了,但是还有点小问题。

skyffire hace 1 año
padre
commit
696de3df8d
Se han modificado 2 ficheros con 77 adiciones y 44 borrados
  1. 63 34
      kappa/data_processing.py
  2. 14 10
      kappa/ws_client.py

+ 63 - 34
kappa/data_processing.py

@@ -29,6 +29,16 @@ bounds = [(10, 1000.0),             # A 的范围
 # 假设S0是初始的参考价格
 S0 = -1
 
+
+def on_message(_ws, message):
+    json_message = json.loads(message)
+
+    if json_message["channel"] == "futures.order_book" and json_message["event"] == "all":
+        on_message_depth(json_message)
+    elif json_message["channel"] == "futures.trades" and json_message["event"] == "update":
+        on_message_trade(json_message)
+
+
 def get_tick_size_from_prices(ask_price, bid_price):
     # 获取价格的小数位数
     ask_decimal_places = len(str(ask_price).split('.')[1])
@@ -40,42 +50,61 @@ def get_tick_size_from_prices(ask_price, bid_price):
     return tick_size
 
 
-def on_message_trade(_ws, message):
+def on_message_trade(json_message):
     global trade_snapshots
-    json_message = json.loads(message)
-    logger.info(json_message)
-    # trade = {
-    #     'price': float(json_message['data']['p']),
-    #     'qty': float(json_message['data']['q']),
-    #     'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
-    #     'side': 'sell' if json_message['data']['m'] else 'buy'
-    # }
-    # trade_snapshots.append(trade)
-    # process_depth_data()
-
-
-def on_message_depth(_ws, message):
+
+    # 解析 Gate.io 的成交信息
+    for trade_origin in json_message['result']:
+        price = trade_origin['price']
+        size = trade_origin['size']
+        timestamp = pd.to_datetime(trade_origin['create_time_ms'], unit='ms')
+
+        # 转换为需要的格式
+        trade = {
+            'price': float(price),
+            'qty': float(size),
+            'timestamp': timestamp
+        }
+
+        # 买卖方向判断
+        if trade["qty"] > 0:
+            trade["side"] = "buy"
+        else:
+            trade["side"] = "sell"
+
+        trade_snapshots.append(trade)
+        process_depth_data()
+
+
+def on_message_depth(json_message):
     global order_book_snapshots, spread_delta_snapshots
-    json_message = json.loads(message)
-    # bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
-    # asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
-    # timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
-    # depth = {
-    #     'bids': bids,
-    #     'asks': asks,
-    #     'timestamp': timestamp
-    # }
-    # order_book_snapshots.append(depth)
-    #
-    # # 求价差
-    # ask_price = Decimal(str(asks[0][0]))
-    # bid_price = Decimal(str(bids[0][0]))
-    # tick_size = get_tick_size_from_prices(ask_price, bid_price)
-    # spread = float(ask_price - bid_price)
-    # spread_delta = int(spread / tick_size)
-    # spread_delta_snapshots.append(spread_delta)
-    #
-    # process_depth_data()
+
+    # 解析 Gate.io 的深度信息
+    result = json_message['result']
+    bids = result['bids']
+    asks = result['asks']
+    timestamp = pd.to_datetime(result['t'], unit='ms')
+
+    # 转换为 Binance 兼容的格式
+    asks_converted = [[float(ask['p']), float(ask['s'])] for ask in asks[:10]]
+    bids_converted = [[float(bid['p']), float(bid['s'])] for bid in bids[:10]]
+
+    depth = {
+        'bids': bids_converted,
+        'asks': asks_converted,
+        'timestamp': timestamp
+    }
+    order_book_snapshots.append(depth)
+
+    # 求价差
+    ask_price = Decimal(str(asks_converted[0][0]))
+    bid_price = Decimal(str(bids_converted[0][0]))
+    tick_size = get_tick_size_from_prices(ask_price, bid_price)
+    spread = float(ask_price - bid_price)
+    spread_delta = int(spread / tick_size)
+    spread_delta_snapshots.append(spread_delta)
+
+    process_depth_data()
 
 
 def calculate_phi(prices, k, S0):

+ 14 - 10
kappa/ws_client.py

@@ -1,27 +1,30 @@
 import json
+import time
 import traceback
 
 import websocket
 import threading
 
 from logger_config import logger
-from data_processing import on_message_depth, on_message_trade, stop_event
+from data_processing import on_message, stop_event
 
 # Gate.io WebSocket API URL
-SYMBOL = "pepeusdt"
+SYMBOL = "SATS_USDT"
 SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt"
 
 # 订阅消息格式
 SUBSCRIBE_TRADE = {
-    "id": 12312,  # 可以是任意的唯一 id
-    "method": "trades.subscribe",
-    "params": [SYMBOL]
+    "time": int(time.time()),
+    "channel": "futures.trades",
+    "event": "subscribe",
+    "payload": [SYMBOL]
 }
 
 SUBSCRIBE_DEPTH = {
-    "id": 12313,  # 可以是任意的唯一 id
-    "method": "depth.subscribe",
-    "params": [SYMBOL, 20, "0.1"]  # 20 是深度档位数量,"0.1" 是深度的精度
+    "time": int(time.time()),
+    "channel": "futures.order_book",
+    "event": "subscribe",
+    "payload": [SYMBOL, "20", "0"]
 }
 
 
@@ -32,13 +35,14 @@ def on_error(_ws, _error):
 
 def on_open(ws):
     logger.info('WebSocket 连接已打开')
-    ws.send(json.dumps(SUBSCRIBE_TRADE))  # 发送订阅消息
+    ws.send(json.dumps(SUBSCRIBE_TRADE))
+    ws.send(json.dumps(SUBSCRIBE_DEPTH))
 
 
 # Create a WebSocket app
 ws_trade = websocket.WebSocketApp(
     SOCKET_TRADE,
-    on_message=on_message_trade,
+    on_message=on_message,
     on_error=on_error,
     on_open=on_open
 )