| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import traceback
- import websocket
- import threading
- from logger_config import logger
- from data_processing import on_message_depth, on_message_trade, stop_event
- # Binance WebSocket API URL
- SYMBOL = "not" + "usdt"
- SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
- SOCKET_DEPTH = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@depth20@100ms"
- def on_error(_ws, _error):
- logger.error('捕获到一个异常。')
- traceback.print_exc() # 打印完整的错误堆栈信息
- # raise error # 重新抛出错误
- def on_open(_ws):
- logger.info("### binance_ws opened ###")
- # Create a WebSocket app
- ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
- ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open)
- # 定义要传递给 run_forever 的参数
- http_proxy_host = "127.0.0.1"
- http_proxy_port = 7890
- proxy_type = "http"
- # Run the WebSocket with proxy settings
- trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
- 'http_proxy_host': http_proxy_host,
- 'http_proxy_port': http_proxy_port,
- 'proxy_type': proxy_type
- })
- depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
- 'http_proxy_host': http_proxy_host,
- 'http_proxy_port': http_proxy_port,
- 'proxy_type': proxy_type
- })
- def start_ws_clients():
- trade_thread.start()
- depth_thread.start()
- def stop_all_threads():
- stop_event.set()
- trade_thread.join()
- depth_thread.join()
|