import json import time import traceback import websocket import threading from logger_config import logger from data_processing import on_message, stop_event # Gate.io WebSocket API URL SYMBOL = "ETH_USDT" SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt" # 订阅消息格式 SUBSCRIBE_TRADE = { "time": int(time.time()), "channel": "futures.trades", "event": "subscribe", "payload": [SYMBOL] } SUBSCRIBE_DEPTH = { "time": int(time.time()), "channel": "futures.order_book", "event": "subscribe", "payload": [SYMBOL, "20", "0"] } def on_error(_ws, _error): logger.error('捕获到一个异常。') traceback.print_exc() # 打印完整的错误堆栈信息 def on_open(ws): logger.info('WebSocket 连接已打开') ws.send(json.dumps(SUBSCRIBE_TRADE)) ws.send(json.dumps(SUBSCRIBE_DEPTH)) # Create a WebSocket app ws_trade = websocket.WebSocketApp( SOCKET_TRADE, on_message=on_message, on_error=on_error, on_open=on_open ) # 定义要传递给 run_forever 的参数 http_proxy_host = "127.0.0.1" http_proxy_port = 7890 proxy_type = "http" # Run the WebSocket with proxy settings ws_thread = threading.Thread( target=ws_trade.run_forever, kwargs={ 'http_proxy_host': http_proxy_host, 'http_proxy_port': http_proxy_port, 'proxy_type': proxy_type }, ) def start_ws_clients(): ws_thread.start() def stop_all_threads(): stop_event.set() ws_thread.join()