import json import traceback import websocket import threading from logger_config import logger from data_processing import on_message_depth, on_message_trade, stop_event # Gate.io WebSocket API URL SYMBOL = "pepeusdt" SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt" # 订阅消息格式 SUBSCRIBE_TRADE = { "id": 12312, # 可以是任意的唯一 id "method": "trades.subscribe", "params": [SYMBOL] } SUBSCRIBE_DEPTH = { "id": 12313, # 可以是任意的唯一 id "method": "depth.subscribe", "params": [SYMBOL, 20, "0.1"] # 20 是深度档位数量,"0.1" 是深度的精度 } def on_error(_ws, _error): logger.error('捕获到一个异常。') traceback.print_exc() # 打印完整的错误堆栈信息 def on_open(ws): logger.info('WebSocket 连接已打开') ws.send(json.dumps(SUBSCRIBE_TRADE)) # 发送订阅消息 # Create a WebSocket app ws_trade = websocket.WebSocketApp( SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open ) # 定义要传递给 run_forever 的参数 http_proxy_host = "127.0.0.1" http_proxy_port = 7890 proxy_type = "http" # Run the WebSocket with proxy settings ws_thread = threading.Thread( target=ws_trade.run_forever, kwargs={ 'http_proxy_host': http_proxy_host, 'http_proxy_port': http_proxy_port, 'proxy_type': proxy_type }, ) def start_ws_clients(): ws_thread.start() def stop_all_threads(): stop_event.set() ws_thread.join()