ws_client.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import traceback
  2. import websocket
  3. import threading
  4. from logger_config import logger
  5. from data_processing import on_message_depth, on_message_trade, stop_event
  6. # Binance WebSocket API URL
  7. SYMBOL = "1000pepe" + "usdt"
  8. SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
  9. SOCKET_DEPTH = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@depth20@100ms"
  10. def on_error(_ws, error):
  11. traceback.print_exc() # 打印完整的错误堆栈信息
  12. # raise error # 重新抛出错误
  13. def on_open(_ws):
  14. logger.info("### binance_ws opened ###")
  15. # Create a WebSocket app
  16. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
  17. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open)
  18. # 定义要传递给 run_forever 的参数
  19. http_proxy_host = "127.0.0.1"
  20. http_proxy_port = 7890
  21. proxy_type = "http"
  22. # Run the WebSocket with proxy settings
  23. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  24. 'http_proxy_host': http_proxy_host,
  25. 'http_proxy_port': http_proxy_port,
  26. 'proxy_type': proxy_type
  27. })
  28. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  29. 'http_proxy_host': http_proxy_host,
  30. 'http_proxy_port': http_proxy_port,
  31. 'proxy_type': proxy_type
  32. })
  33. def start_ws_clients():
  34. trade_thread.start()
  35. depth_thread.start()
  36. def stop_all_threads():
  37. stop_event.set()
  38. trade_thread.join()
  39. depth_thread.join()