| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import json
- import time
- import traceback
- import websocket
- import threading
- from logger_config import logger
- from data_processing import on_message, stop_event
- # Gate.io WebSocket API URL
- SYMBOL = "ETH_USDT"
- SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt"
- # 订阅消息格式
- SUBSCRIBE_TRADE = {
- "time": int(time.time()),
- "channel": "futures.trades",
- "event": "subscribe",
- "payload": [SYMBOL]
- }
- SUBSCRIBE_DEPTH = {
- "time": int(time.time()),
- "channel": "futures.order_book",
- "event": "subscribe",
- "payload": [SYMBOL, "20", "0"]
- }
- def on_error(_ws, _error):
- logger.error('捕获到一个异常。')
- traceback.print_exc() # 打印完整的错误堆栈信息
- def on_open(ws):
- logger.info('WebSocket 连接已打开')
- ws.send(json.dumps(SUBSCRIBE_TRADE))
- ws.send(json.dumps(SUBSCRIBE_DEPTH))
- # Create a WebSocket app
- ws_trade = websocket.WebSocketApp(
- SOCKET_TRADE,
- on_message=on_message,
- on_error=on_error,
- on_open=on_open
- )
- # 定义要传递给 run_forever 的参数
- http_proxy_host = "127.0.0.1"
- http_proxy_port = 7890
- proxy_type = "http"
- # Run the WebSocket with proxy settings
- ws_thread = threading.Thread(
- target=ws_trade.run_forever,
- kwargs={
- 'http_proxy_host': http_proxy_host,
- 'http_proxy_port': http_proxy_port,
- 'proxy_type': proxy_type
- },
- )
- def start_ws_clients():
- ws_thread.start()
- def stop_all_threads():
- stop_event.set()
- ws_thread.join()
|