| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- import json
- import websocket
- import threading
- import time
- import signal
- from typing import Callable, Any, Dict, List
- # --- 可配置的常量 ---
- BASE_URL = 'wss://wbs.mexc.com/ws'
- DEFAULT_RECONNECT_DELAY_SECONDS = 5
- DEFAULT_MAX_RECONNECT_DELAY_SECONDS = 60
- DEFAULT_PING_INTERVAL_SECONDS = 30
- class MexcWSClient:
- def __init__(self,
- symbol: str,
- on_message_callback: Callable[[Dict[str, Any]], None],
- subscriptions: List[str] = None, # 允许自定义订阅频道
- base_url: str = BASE_URL,
- reconnect_delay: int = DEFAULT_RECONNECT_DELAY_SECONDS,
- max_reconnect_delay: int = DEFAULT_MAX_RECONNECT_DELAY_SECONDS,
- ping_interval: int = DEFAULT_PING_INTERVAL_SECONDS):
- """
- MEXC WebSocket 客户端
- Args:
- symbol (str): 交易对,例如 "BTCUSDT". 将用于构建默认订阅频道。
- on_message_callback (Callable[[Dict[str, Any]], None]): 接收到并解析后的消息回调函数。
- subscriptions (List[str], optional): 自定义订阅频道列表。
- 如果提供,则使用此列表,格式如 "spot@public.deals.v3.api@BTCUSDT"。
- 如果不提供,则使用 symbol 构建默认订阅:"spot@public.increase.depth.v3.api@{symbol}"。
- base_url (str, optional): WebSocket 服务器 URL.
- reconnect_delay (int, optional): 初始重连延迟(秒).
- max_reconnect_delay (int, optional): 最大重连延迟(秒).
- ping_interval (int, optional): Ping 发送间隔(秒).
- """
- self.symbol = symbol.upper()
- self.on_message_callback = on_message_callback
- self.base_url = base_url
- self.initial_reconnect_delay = reconnect_delay
- self.max_reconnect_delay = max_reconnect_delay
- self.ping_interval = ping_interval
-
- if subscriptions:
- self.subscriptions = subscriptions
- else:
- # 默认订阅增量深度,可以根据需要修改或允许通过参数配置
- self.subscriptions = [f"spot@public.increase.depth.v3.api@{self.symbol}"]
- self.current_reconnect_delay = self.initial_reconnect_delay
- self.shutdown_signal = threading.Event()
- self.user_initiated_close = False
- self.ws_app: websocket.WebSocketApp = None
- self.main_thread: threading.Thread = None
- self.ping_thread: threading.Thread = None
- print(f"MexcWSClient initialized for symbol: {self.symbol}")
- print(f"Subscriptions: {self.subscriptions}")
- def _on_message(self, wsapp, message_str: str):
- try:
- data = json.loads(message_str)
- # 将解析后的数据传递给用户提供的回调
- if self.on_message_callback:
- self.on_message_callback(data)
- except json.JSONDecodeError:
- print(f"[{self.symbol}] Failed to parse message as JSON: {message_str[:100]}...")
- except Exception as e:
- print(f"[{self.symbol}] Error processing message in callback: {e}")
- def _on_error(self, wsapp, error: Exception):
- print(f"[{self.symbol}] Connection error: {error}")
- if isinstance(error, websocket.WebSocketConnectionClosedException):
- print(f"[{self.symbol}] Connection closed unexpectedly due to error.")
- elif isinstance(error, ConnectionRefusedError):
- print(f"[{self.symbol}] Connection refused by server.")
- # 在这里,run_forever 应该会退出,然后重连逻辑会接管
- def _on_close(self, wsapp, close_status_code: int, close_msg: str):
- print(f"[{self.symbol}] Connection closed. Status: {close_status_code}, Message: '{close_msg}'")
- # Ping 线程可能在此处自然结束或需要显式停止信号
- if self.ping_thread and self.ping_thread.is_alive():
- # 通常 ping 线程会因为 ws_app.sock.connected 变为 False 而退出
- pass
-
- if not self.user_initiated_close and not self.shutdown_signal.is_set():
- print(f"[{self.symbol}] Will attempt to reconnect...")
- else:
- print(f"[{self.symbol}] Close was user-initiated or app is shutting down. No reconnect.")
- def _send_ping(self):
- ping_count = 0
- while not self.shutdown_signal.is_set():
- # 等待 ping_interval 时间,或者直到 shutdown_signal 被设置
- if self.shutdown_signal.wait(timeout=self.ping_interval):
- break # Shutdown signal received
- if self.ws_app and self.ws_app.sock and self.ws_app.sock.connected:
- try:
- ping_payload = {"method": "ping"}
- self.ws_app.send(json.dumps(ping_payload))
- # print(f"[{self.symbol}] Sent ping {ping_count}")
- ping_count += 1
- except Exception as e:
- print(f"[{self.symbol}] Error sending ping: {e}")
- break # 可能是连接已断开
- else:
- # print(f"[{self.symbol}] Ping thread: Connection not active. Stopping pings for this session.")
- break # 连接已断开
- print(f"[{self.symbol}] Ping thread finished.")
- def _on_open(self, wsapp):
- print(f"[{self.symbol}] Connection opened....")
- self.current_reconnect_delay = self.initial_reconnect_delay # 重置重连延迟
- subscription_payload = {
- "method": "SUBSCRIPTION",
- "params": self.subscriptions
- }
- try:
- print(f"[{self.symbol}] Sending subscription: {json.dumps(subscription_payload)}")
- wsapp.send(json.dumps(subscription_payload))
- print(f"[{self.symbol}] Subscription sent.")
- # 启动 Ping 线程
- if self.ping_thread and self.ping_thread.is_alive(): # 确保旧的ping线程结束
- self.ping_thread.join(timeout=1)
- self.ping_thread = threading.Thread(target=self._send_ping, daemon=True)
- self.ping_thread.start()
- print(f"[{self.symbol}] Ping thread started.")
- except Exception as e:
- print(f"[{self.symbol}] Error during on_open: {e}")
- # 如果 on_open 失败,可能需要主动关闭 ws_app 以触发重连
- if self.ws_app:
- self.ws_app.close()
- def _connect_and_run_forever(self):
- print(f"[{self.symbol}] Attempting to connect to {self.base_url}...")
- self.ws_app = websocket.WebSocketApp(self.base_url,
- on_message=self._on_message,
- on_error=self._on_error,
- on_close=self._on_close,
- on_open=self._on_open)
- try:
- self.ws_app.run_forever() # 此调用会阻塞直到连接关闭
- except Exception as e:
- print(f"[{self.symbol}] Exception in WebSocketApp.run_forever scope: {e}")
-
- # run_forever 返回后,清理 ws_app 以便下次重连可以创建新的实例
- self.ws_app = None
- def _run(self):
- """主运行循环,包含断线重连逻辑"""
- while not self.shutdown_signal.is_set():
- self._connect_and_run_forever() # This blocks until connection closes
- if self.shutdown_signal.is_set() or self.user_initiated_close:
- break # 如果是主动关闭或应用关闭信号,则不重连
- print(f"[{self.symbol}] Disconnected. Will attempt to reconnect in {self.current_reconnect_delay} seconds.")
- if self.shutdown_signal.wait(timeout=self.current_reconnect_delay):
- print(f"[{self.symbol}] Shutdown signal received during reconnect delay.")
- break # 在等待期间收到关闭信号
-
- self.current_reconnect_delay = min(self.current_reconnect_delay * 2, self.max_reconnect_delay)
-
- print(f"[{self.symbol}] Main client loop finished.")
- def start(self):
- """启动 WebSocket 客户端。此方法会启动一个新的线程来运行客户端。"""
- if self.main_thread and self.main_thread.is_alive():
- print(f"[{self.symbol}] Client is already running.")
- return
- print(f"[{self.symbol}] Starting client...")
- self.shutdown_signal.clear() # 重置关闭信号
- self.user_initiated_close = False # 重置用户关闭标记
- self.main_thread = threading.Thread(target=self._run, daemon=True)
- self.main_thread.start()
- def stop(self):
- """停止 WebSocket 客户端。"""
- print(f"[{self.symbol}] Initiating graceful shutdown...")
- self.user_initiated_close = True
- self.shutdown_signal.set() # 通知所有循环停止
- if self.ws_app:
- print(f"[{self.symbol}] Attempting to close WebSocket connection...")
- self.ws_app.close() # 这会触发 _on_close
- if self.ping_thread and self.ping_thread.is_alive():
- self.ping_thread.join(timeout=5) # 等待 ping 线程结束
- if self.ping_thread.is_alive():
- print(f"[{self.symbol}] Ping thread did not terminate gracefully.")
-
- if self.main_thread and self.main_thread.is_alive():
- self.main_thread.join(timeout=10) # 等待主线程结束
- if self.main_thread.is_alive():
- print(f"[{self.symbol}] Main client thread did not terminate gracefully.")
-
- print(f"[{self.symbol}] Client shutdown process complete.")
-
- def join(self, timeout=None):
- """
- 等待客户端主线程结束。
- 这在主程序需要等待客户端完成时很有用。
- """
- if self.main_thread and self.main_thread.is_alive():
- self.main_thread.join(timeout)
- # --- 外部调用示例 ---
- # 全局变量,以便信号处理器可以访问客户端实例
- active_clients: List[MexcWSClient] = []
- def handle_ctrl_c(sig, frame):
- print("\nCtrl+C received. Stopping all clients...")
- for client_instance in active_clients:
- client_instance.stop()
- # 给所有客户端一点时间关闭
- time.sleep(2)
- # 可以选择在这里 sys.exit(0) 或让主程序自然结束
- def my_btc_message_handler(data: Dict[str, Any]):
- # 这是用户定义的处理 BTCUSDT 消息的函数
- print(f"[BTC_HANDLER] Received data: {data}")
- # if 'c' in data and data['c'] == 'spot@public.increase.depth.v3.api@BTCUSDT':
- # if 'd' in data and 'asks' in data['d'] and data['d']['asks']:
- # first_ask_price = data['d']['asks'][0][0]
- # print(f"[BTC_HANDLER] BTCUSDT First Ask Price: {first_ask_price}")
- # # else:
- # # print(f"[BTC_HANDLER] No asks data in depth update: {data.get('d')}")
- # def my_eth_message_handler(data: Dict[str, Any]):
- # # 这是用户定义的处理 ETHUSDT 消息的函数
- # # 这是用户定义的处理 BTCUSDT 消息的函数
- # print(f"[ETH_HANDLER] Received data: {data}")
- # # if 'c' in data and data['c'] == 'spot@public.deals.v3.api@ETHUSDT':
- # # if 'd' in data and 'deals' in data['d'] and data['d']['deals']:
- # # for deal in data['d']['deals']:
- # # print(f"[ETH_HANDLER] ETHUSDT Deal: Price={deal.get('p')}, Qty={deal.get('q')}, Time={deal.get('t')}")
- if __name__ == "__main__":
- # 注册 Ctrl+C 信号处理器
- signal.signal(signal.SIGINT, handle_ctrl_c)
- signal.signal(signal.SIGTERM, handle_ctrl_c)
- print("Application starting. Press Ctrl+C to stop.")
- # 创建并启动第一个客户端 (BTCUSDT 增量深度)
- btc_client = MexcWSClient(symbol="BTCUSDT", on_message_callback=my_btc_message_handler)
- active_clients.append(btc_client)
- btc_client.start()
- # # 创建并启动第二个客户端 (ETHUSDT 成交记录)
- # # 使用自定义订阅频道
- # eth_subscriptions = ["spot@public.deals.v3.api@ETHUSDT"]
- # eth_client = MexcWSClient(
- # symbol="ETHUSDT",
- # on_message_callback=my_eth_message_handler,
- # subscriptions=eth_subscriptions
- # )
- # active_clients.append(eth_client)
- # eth_client.start()
- # 主程序保持运行,直到接收到关闭信号
- try:
- while True:
- all_stopped = True
- for client in active_clients:
- if client.main_thread and client.main_thread.is_alive():
- all_stopped = False
- break
- if all_stopped and any(client.user_initiated_close for client in active_clients): # 确保是用户主动关闭
- print("All clients have stopped. Exiting main loop.")
- break
- time.sleep(1) # 避免CPU空转
- except KeyboardInterrupt: # 理论上 signal_handler会处理,这里作为后备
- print("Main loop interrupted by KeyboardInterrupt.")
- finally:
- print("Cleaning up remaining client resources (if any)...")
- for client in active_clients:
- if not client.user_initiated_close: # 如果某个客户端不是由信号处理器停止的
- client.stop() # 确保停止
- client.join(timeout=5) # 最后尝试join
- print("Application shut down.")
|