ws_client.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import json
  2. import time
  3. import traceback
  4. import websocket
  5. import threading
  6. from logger_config import logger
  7. from data_processing import on_message, stop_event
  8. # Gate.io WebSocket API URL
  9. SYMBOL = "ETH_USDT"
  10. SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt"
  11. # 订阅消息格式
  12. SUBSCRIBE_TRADE = {
  13. "time": int(time.time()),
  14. "channel": "futures.trades",
  15. "event": "subscribe",
  16. "payload": [SYMBOL]
  17. }
  18. SUBSCRIBE_DEPTH = {
  19. "time": int(time.time()),
  20. "channel": "futures.order_book",
  21. "event": "subscribe",
  22. "payload": [SYMBOL, "20", "0"]
  23. }
  24. def on_error(_ws, _error):
  25. logger.error('捕获到一个异常。')
  26. traceback.print_exc() # 打印完整的错误堆栈信息
  27. def on_open(ws):
  28. logger.info('WebSocket 连接已打开')
  29. ws.send(json.dumps(SUBSCRIBE_TRADE))
  30. ws.send(json.dumps(SUBSCRIBE_DEPTH))
  31. # Create a WebSocket app
  32. ws_trade = websocket.WebSocketApp(
  33. SOCKET_TRADE,
  34. on_message=on_message,
  35. on_error=on_error,
  36. on_open=on_open
  37. )
  38. # 定义要传递给 run_forever 的参数
  39. http_proxy_host = "127.0.0.1"
  40. http_proxy_port = 7890
  41. proxy_type = "http"
  42. # Run the WebSocket with proxy settings
  43. ws_thread = threading.Thread(
  44. target=ws_trade.run_forever,
  45. kwargs={
  46. 'http_proxy_host': http_proxy_host,
  47. 'http_proxy_port': http_proxy_port,
  48. 'proxy_type': proxy_type
  49. },
  50. )
  51. def start_ws_clients():
  52. ws_thread.start()
  53. def stop_all_threads():
  54. stop_event.set()
  55. ws_thread.join()