Browse Source

gate的ws连接问题解决了。

skyffire 1 year ago
parent
commit
2234355825
3 changed files with 24 additions and 30 deletions
  1. 1 1
      kappa/data_processing.py
  2. 1 1
      kappa/main.py
  3. 22 28
      kappa/ws_client.py

+ 1 - 1
kappa/data_processing.py

@@ -43,7 +43,7 @@ def get_tick_size_from_prices(ask_price, bid_price):
 def on_message_trade(_ws, message):
     global trade_snapshots
     json_message = json.loads(message)
-    logger.debug(json_message)
+    logger.info(json_message)
     # trade = {
     #     'price': float(json_message['data']['p']),
     #     'qty': float(json_message['data']['q']),

+ 1 - 1
kappa/main.py

@@ -1,4 +1,4 @@
-import asyncio
+# import asyncio
 from ws_client import start_ws_clients
 import warnings
 

+ 22 - 28
kappa/ws_client.py

@@ -7,11 +7,9 @@ import threading
 from logger_config import logger
 from data_processing import on_message_depth, on_message_trade, stop_event
 
-# Binance WebSocket API URL
-# 订阅信息
-SYMBOL = "zrousdt"
+# Gate.io WebSocket API URL
+SYMBOL = "pepeusdt"
 SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt"
-SOCKET_DEPTH = "wss://fx-ws.gateio.ws/v4/ws/usdt"
 
 # 订阅消息格式
 SUBSCRIBE_TRADE = {
@@ -29,21 +27,21 @@ SUBSCRIBE_DEPTH = {
 
 def on_error(_ws, _error):
     logger.error('捕获到一个异常。')
-    traceback.print_exc()       # 打印完整的错误堆栈信息
-    # raise error                 # 重新抛出错误
+    traceback.print_exc()  # 打印完整的错误堆栈信息
 
 
-def on_open_trade(ws):
-    ws.send(json.dumps(SUBSCRIBE_TRADE))
-
-
-def on_open_depth(ws):
-    ws.send(json.dumps(SUBSCRIBE_DEPTH))
+def on_open(ws):
+    logger.info('WebSocket 连接已打开')
+    ws.send(json.dumps(SUBSCRIBE_TRADE))  # 发送订阅消息
 
 
 # Create a WebSocket app
-ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open_trade)
-ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open_depth)
+ws_trade = websocket.WebSocketApp(
+    SOCKET_TRADE,
+    on_message=on_message_trade,
+    on_error=on_error,
+    on_open=on_open
+)
 
 # 定义要传递给 run_forever 的参数
 http_proxy_host = "127.0.0.1"
@@ -51,24 +49,20 @@ http_proxy_port = 7890
 proxy_type = "http"
 
 # Run the WebSocket with proxy settings
-trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
-    'http_proxy_host': http_proxy_host,
-    'http_proxy_port': http_proxy_port,
-    'proxy_type': proxy_type
-})
-depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
-    'http_proxy_host': http_proxy_host,
-    'http_proxy_port': http_proxy_port,
-    'proxy_type': proxy_type
-})
+ws_thread = threading.Thread(
+    target=ws_trade.run_forever,
+    kwargs={
+        'http_proxy_host': http_proxy_host,
+        'http_proxy_port': http_proxy_port,
+        'proxy_type': proxy_type
+    },
+)
 
 
 def start_ws_clients():
-    trade_thread.start()
-    depth_thread.start()
+    ws_thread.start()
 
 
 def stop_all_threads():
     stop_event.set()
-    trade_thread.join()
-    depth_thread.join()
+    ws_thread.join()