ws_client.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import os
  2. import traceback
  3. import websocket
  4. import threading
  5. from logger_config import logger
  6. from data_processing import on_message_depth, on_message_trade, stop_event
  7. # Binance WebSocket API URL
  8. SYMBOL = "1000pepe" + "usdt"
  9. SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
  10. SOCKET_DEPTH = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@depth20@100ms"
  11. def on_error(_ws, _error):
  12. logger.error('捕获到一个异常。')
  13. traceback.print_exc() # 打印完整的错误堆栈信息
  14. # raise error # 重新抛出错误
  15. def on_open(_ws):
  16. logger.info("### binance_ws opened ###")
  17. # Create a WebSocket app
  18. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
  19. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open)
  20. # 识别开发模式
  21. app_mode = os.getenv('APP_MODE', 'production')
  22. if app_mode == 'development':
  23. # 在开发模式下执行的代码
  24. # 定义要传递给 run_forever 的参数
  25. http_proxy_host = "127.0.0.1"
  26. http_proxy_port = 7890
  27. proxy_type = "http"
  28. # Run the WebSocket with proxy settings
  29. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  30. 'http_proxy_host': http_proxy_host,
  31. 'http_proxy_port': http_proxy_port,
  32. 'proxy_type': proxy_type
  33. })
  34. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  35. 'http_proxy_host': http_proxy_host,
  36. 'http_proxy_port': http_proxy_port,
  37. 'proxy_type': proxy_type
  38. })
  39. # 在生产模式下执行的代码
  40. else:
  41. # Run the WebSocket with proxy settings
  42. trade_thread = threading.Thread(target=ws_trade.run_forever)
  43. depth_thread = threading.Thread(target=ws_depth.run_forever)
  44. def start_ws_clients():
  45. trade_thread.start()
  46. depth_thread.start()
  47. def stop_all_threads():
  48. stop_event.set()
  49. trade_thread.join()
  50. depth_thread.join()