| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import json
- import traceback
- import websocket
- import threading
- from logger_config import logger
- from data_processing import on_message_depth, on_message_trade, stop_event
- # Gate.io WebSocket API URL
- SYMBOL = "pepeusdt"
- SOCKET_TRADE = "wss://fx-ws.gateio.ws/v4/ws/usdt"
- # 订阅消息格式
- SUBSCRIBE_TRADE = {
- "id": 12312, # 可以是任意的唯一 id
- "method": "trades.subscribe",
- "params": [SYMBOL]
- }
- SUBSCRIBE_DEPTH = {
- "id": 12313, # 可以是任意的唯一 id
- "method": "depth.subscribe",
- "params": [SYMBOL, 20, "0.1"] # 20 是深度档位数量,"0.1" 是深度的精度
- }
- def on_error(_ws, _error):
- logger.error('捕获到一个异常。')
- traceback.print_exc() # 打印完整的错误堆栈信息
- def on_open(ws):
- logger.info('WebSocket 连接已打开')
- ws.send(json.dumps(SUBSCRIBE_TRADE)) # 发送订阅消息
- # Create a WebSocket app
- ws_trade = websocket.WebSocketApp(
- SOCKET_TRADE,
- on_message=on_message_trade,
- on_error=on_error,
- on_open=on_open
- )
- # 定义要传递给 run_forever 的参数
- http_proxy_host = "127.0.0.1"
- http_proxy_port = 7890
- proxy_type = "http"
- # Run the WebSocket with proxy settings
- ws_thread = threading.Thread(
- target=ws_trade.run_forever,
- kwargs={
- 'http_proxy_host': http_proxy_host,
- 'http_proxy_port': http_proxy_port,
- 'proxy_type': proxy_type
- },
- )
- def start_ws_clients():
- ws_thread.start()
- def stop_all_threads():
- stop_event.set()
- ws_thread.join()
|