ws_client.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import json
  2. import traceback
  3. import websocket
  4. import threading
  5. from logger_config import logger
  6. from data_processing import on_message_depth, on_message_trade, stop_event
  7. # Gate.io WebSocket API URL
  8. SYMBOL = "pepeusdt"
  9. SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt"
  10. # 订阅消息格式
  11. SUBSCRIBE_TRADE = {
  12. "id": 12312, # 可以是任意的唯一 id
  13. "method": "trades.subscribe",
  14. "params": [SYMBOL]
  15. }
  16. SUBSCRIBE_DEPTH = {
  17. "id": 12313, # 可以是任意的唯一 id
  18. "method": "depth.subscribe",
  19. "params": [SYMBOL, 20, "0.1"] # 20 是深度档位数量,"0.1" 是深度的精度
  20. }
  21. def on_error(_ws, _error):
  22. logger.error('捕获到一个异常。')
  23. traceback.print_exc() # 打印完整的错误堆栈信息
  24. def on_open(ws):
  25. logger.info('WebSocket 连接已打开')
  26. ws.send(json.dumps(SUBSCRIBE_TRADE)) # 发送订阅消息
  27. # Create a WebSocket app
  28. ws_trade = websocket.WebSocketApp(
  29. SOCKET_TRADE,
  30. on_message=on_message_trade,
  31. on_error=on_error,
  32. on_open=on_open
  33. )
  34. # 定义要传递给 run_forever 的参数
  35. http_proxy_host = "127.0.0.1"
  36. http_proxy_port = 7890
  37. proxy_type = "http"
  38. # Run the WebSocket with proxy settings
  39. ws_thread = threading.Thread(
  40. target=ws_trade.run_forever,
  41. kwargs={
  42. 'http_proxy_host': http_proxy_host,
  43. 'http_proxy_port': http_proxy_port,
  44. 'proxy_type': proxy_type
  45. },
  46. )
  47. def start_ws_clients():
  48. ws_thread.start()
  49. def stop_all_threads():
  50. stop_event.set()
  51. ws_thread.join()