mexc_ws_client.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. import json
  2. import websocket
  3. import threading
  4. import time
  5. import signal
  6. from typing import Callable, Any, Dict, List
  7. # --- 可配置的常量 ---
  8. BASE_URL = 'wss://wbs.mexc.com/ws'
  9. DEFAULT_RECONNECT_DELAY_SECONDS = 5
  10. DEFAULT_MAX_RECONNECT_DELAY_SECONDS = 60
  11. DEFAULT_PING_INTERVAL_SECONDS = 30
  12. class MexcWSClient:
  13. def __init__(self,
  14. symbol: str,
  15. on_message_callback: Callable[[Dict[str, Any]], None],
  16. subscriptions: List[str] = None, # 允许自定义订阅频道
  17. base_url: str = BASE_URL,
  18. reconnect_delay: int = DEFAULT_RECONNECT_DELAY_SECONDS,
  19. max_reconnect_delay: int = DEFAULT_MAX_RECONNECT_DELAY_SECONDS,
  20. ping_interval: int = DEFAULT_PING_INTERVAL_SECONDS):
  21. """
  22. MEXC WebSocket 客户端
  23. Args:
  24. symbol (str): 交易对,例如 "BTCUSDT". 将用于构建默认订阅频道。
  25. on_message_callback (Callable[[Dict[str, Any]], None]): 接收到并解析后的消息回调函数。
  26. subscriptions (List[str], optional): 自定义订阅频道列表。
  27. 如果提供,则使用此列表,格式如 "spot@public.deals.v3.api@BTCUSDT"。
  28. 如果不提供,则使用 symbol 构建默认订阅:"spot@public.increase.depth.v3.api@{symbol}"。
  29. base_url (str, optional): WebSocket 服务器 URL.
  30. reconnect_delay (int, optional): 初始重连延迟(秒).
  31. max_reconnect_delay (int, optional): 最大重连延迟(秒).
  32. ping_interval (int, optional): Ping 发送间隔(秒).
  33. """
  34. self.symbol = symbol.upper()
  35. self.on_message_callback = on_message_callback
  36. self.base_url = base_url
  37. self.initial_reconnect_delay = reconnect_delay
  38. self.max_reconnect_delay = max_reconnect_delay
  39. self.ping_interval = ping_interval
  40. if subscriptions:
  41. self.subscriptions = subscriptions
  42. else:
  43. # 默认订阅增量深度,可以根据需要修改或允许通过参数配置
  44. self.subscriptions = [f"spot@public.increase.depth.v3.api@{self.symbol}"]
  45. self.current_reconnect_delay = self.initial_reconnect_delay
  46. self.shutdown_signal = threading.Event()
  47. self.user_initiated_close = False
  48. self.ws_app: websocket.WebSocketApp = None
  49. self.main_thread: threading.Thread = None
  50. self.ping_thread: threading.Thread = None
  51. print(f"MexcWSClient initialized for symbol: {self.symbol}")
  52. print(f"Subscriptions: {self.subscriptions}")
  53. def _on_message(self, wsapp, message_str: str):
  54. try:
  55. data = json.loads(message_str)
  56. # 将解析后的数据传递给用户提供的回调
  57. if self.on_message_callback:
  58. self.on_message_callback(data)
  59. except json.JSONDecodeError:
  60. print(f"[{self.symbol}] Failed to parse message as JSON: {message_str[:100]}...")
  61. except Exception as e:
  62. print(f"[{self.symbol}] Error processing message in callback: {e}")
  63. def _on_error(self, wsapp, error: Exception):
  64. print(f"[{self.symbol}] Connection error: {error}")
  65. if isinstance(error, websocket.WebSocketConnectionClosedException):
  66. print(f"[{self.symbol}] Connection closed unexpectedly due to error.")
  67. elif isinstance(error, ConnectionRefusedError):
  68. print(f"[{self.symbol}] Connection refused by server.")
  69. # 在这里,run_forever 应该会退出,然后重连逻辑会接管
  70. def _on_close(self, wsapp, close_status_code: int, close_msg: str):
  71. print(f"[{self.symbol}] Connection closed. Status: {close_status_code}, Message: '{close_msg}'")
  72. # Ping 线程可能在此处自然结束或需要显式停止信号
  73. if self.ping_thread and self.ping_thread.is_alive():
  74. # 通常 ping 线程会因为 ws_app.sock.connected 变为 False 而退出
  75. pass
  76. if not self.user_initiated_close and not self.shutdown_signal.is_set():
  77. print(f"[{self.symbol}] Will attempt to reconnect...")
  78. else:
  79. print(f"[{self.symbol}] Close was user-initiated or app is shutting down. No reconnect.")
  80. def _send_ping(self):
  81. ping_count = 0
  82. while not self.shutdown_signal.is_set():
  83. # 等待 ping_interval 时间,或者直到 shutdown_signal 被设置
  84. if self.shutdown_signal.wait(timeout=self.ping_interval):
  85. break # Shutdown signal received
  86. if self.ws_app and self.ws_app.sock and self.ws_app.sock.connected:
  87. try:
  88. ping_payload = {"method": "ping"}
  89. self.ws_app.send(json.dumps(ping_payload))
  90. # print(f"[{self.symbol}] Sent ping {ping_count}")
  91. ping_count += 1
  92. except Exception as e:
  93. print(f"[{self.symbol}] Error sending ping: {e}")
  94. break # 可能是连接已断开
  95. else:
  96. # print(f"[{self.symbol}] Ping thread: Connection not active. Stopping pings for this session.")
  97. break # 连接已断开
  98. print(f"[{self.symbol}] Ping thread finished.")
  99. def _on_open(self, wsapp):
  100. print(f"[{self.symbol}] Connection opened....")
  101. self.current_reconnect_delay = self.initial_reconnect_delay # 重置重连延迟
  102. subscription_payload = {
  103. "method": "SUBSCRIPTION",
  104. "params": self.subscriptions
  105. }
  106. try:
  107. print(f"[{self.symbol}] Sending subscription: {json.dumps(subscription_payload)}")
  108. wsapp.send(json.dumps(subscription_payload))
  109. print(f"[{self.symbol}] Subscription sent.")
  110. # 启动 Ping 线程
  111. if self.ping_thread and self.ping_thread.is_alive(): # 确保旧的ping线程结束
  112. self.ping_thread.join(timeout=1)
  113. self.ping_thread = threading.Thread(target=self._send_ping, daemon=True)
  114. self.ping_thread.start()
  115. print(f"[{self.symbol}] Ping thread started.")
  116. except Exception as e:
  117. print(f"[{self.symbol}] Error during on_open: {e}")
  118. # 如果 on_open 失败,可能需要主动关闭 ws_app 以触发重连
  119. if self.ws_app:
  120. self.ws_app.close()
  121. def _connect_and_run_forever(self):
  122. print(f"[{self.symbol}] Attempting to connect to {self.base_url}...")
  123. self.ws_app = websocket.WebSocketApp(self.base_url,
  124. on_message=self._on_message,
  125. on_error=self._on_error,
  126. on_close=self._on_close,
  127. on_open=self._on_open)
  128. try:
  129. self.ws_app.run_forever() # 此调用会阻塞直到连接关闭
  130. except Exception as e:
  131. print(f"[{self.symbol}] Exception in WebSocketApp.run_forever scope: {e}")
  132. # run_forever 返回后,清理 ws_app 以便下次重连可以创建新的实例
  133. self.ws_app = None
  134. def _run(self):
  135. """主运行循环,包含断线重连逻辑"""
  136. while not self.shutdown_signal.is_set():
  137. self._connect_and_run_forever() # This blocks until connection closes
  138. if self.shutdown_signal.is_set() or self.user_initiated_close:
  139. break # 如果是主动关闭或应用关闭信号,则不重连
  140. print(f"[{self.symbol}] Disconnected. Will attempt to reconnect in {self.current_reconnect_delay} seconds.")
  141. if self.shutdown_signal.wait(timeout=self.current_reconnect_delay):
  142. print(f"[{self.symbol}] Shutdown signal received during reconnect delay.")
  143. break # 在等待期间收到关闭信号
  144. self.current_reconnect_delay = min(self.current_reconnect_delay * 2, self.max_reconnect_delay)
  145. print(f"[{self.symbol}] Main client loop finished.")
  146. def start(self):
  147. """启动 WebSocket 客户端。此方法会启动一个新的线程来运行客户端。"""
  148. if self.main_thread and self.main_thread.is_alive():
  149. print(f"[{self.symbol}] Client is already running.")
  150. return
  151. print(f"[{self.symbol}] Starting client...")
  152. self.shutdown_signal.clear() # 重置关闭信号
  153. self.user_initiated_close = False # 重置用户关闭标记
  154. self.main_thread = threading.Thread(target=self._run, daemon=True)
  155. self.main_thread.start()
  156. def stop(self):
  157. """停止 WebSocket 客户端。"""
  158. print(f"[{self.symbol}] Initiating graceful shutdown...")
  159. self.user_initiated_close = True
  160. self.shutdown_signal.set() # 通知所有循环停止
  161. if self.ws_app:
  162. print(f"[{self.symbol}] Attempting to close WebSocket connection...")
  163. self.ws_app.close() # 这会触发 _on_close
  164. if self.ping_thread and self.ping_thread.is_alive():
  165. self.ping_thread.join(timeout=5) # 等待 ping 线程结束
  166. if self.ping_thread.is_alive():
  167. print(f"[{self.symbol}] Ping thread did not terminate gracefully.")
  168. if self.main_thread and self.main_thread.is_alive():
  169. self.main_thread.join(timeout=10) # 等待主线程结束
  170. if self.main_thread.is_alive():
  171. print(f"[{self.symbol}] Main client thread did not terminate gracefully.")
  172. print(f"[{self.symbol}] Client shutdown process complete.")
  173. def join(self, timeout=None):
  174. """
  175. 等待客户端主线程结束。
  176. 这在主程序需要等待客户端完成时很有用。
  177. """
  178. if self.main_thread and self.main_thread.is_alive():
  179. self.main_thread.join(timeout)
  180. # --- 外部调用示例 ---
  181. # 全局变量,以便信号处理器可以访问客户端实例
  182. active_clients: List[MexcWSClient] = []
  183. def handle_ctrl_c(sig, frame):
  184. print("\nCtrl+C received. Stopping all clients...")
  185. for client_instance in active_clients:
  186. client_instance.stop()
  187. # 给所有客户端一点时间关闭
  188. time.sleep(2)
  189. # 可以选择在这里 sys.exit(0) 或让主程序自然结束
  190. def my_btc_message_handler(data: Dict[str, Any]):
  191. # 这是用户定义的处理 BTCUSDT 消息的函数
  192. print(f"[BTC_HANDLER] Received data: {data}")
  193. # if 'c' in data and data['c'] == 'spot@public.increase.depth.v3.api@BTCUSDT':
  194. # if 'd' in data and 'asks' in data['d'] and data['d']['asks']:
  195. # first_ask_price = data['d']['asks'][0][0]
  196. # print(f"[BTC_HANDLER] BTCUSDT First Ask Price: {first_ask_price}")
  197. # # else:
  198. # # print(f"[BTC_HANDLER] No asks data in depth update: {data.get('d')}")
  199. # def my_eth_message_handler(data: Dict[str, Any]):
  200. # # 这是用户定义的处理 ETHUSDT 消息的函数
  201. # # 这是用户定义的处理 BTCUSDT 消息的函数
  202. # print(f"[ETH_HANDLER] Received data: {data}")
  203. # # if 'c' in data and data['c'] == 'spot@public.deals.v3.api@ETHUSDT':
  204. # # if 'd' in data and 'deals' in data['d'] and data['d']['deals']:
  205. # # for deal in data['d']['deals']:
  206. # # print(f"[ETH_HANDLER] ETHUSDT Deal: Price={deal.get('p')}, Qty={deal.get('q')}, Time={deal.get('t')}")
  207. if __name__ == "__main__":
  208. # 注册 Ctrl+C 信号处理器
  209. signal.signal(signal.SIGINT, handle_ctrl_c)
  210. signal.signal(signal.SIGTERM, handle_ctrl_c)
  211. print("Application starting. Press Ctrl+C to stop.")
  212. # 创建并启动第一个客户端 (BTCUSDT 增量深度)
  213. btc_client = MexcWSClient(symbol="BTCUSDT", on_message_callback=my_btc_message_handler)
  214. active_clients.append(btc_client)
  215. btc_client.start()
  216. # # 创建并启动第二个客户端 (ETHUSDT 成交记录)
  217. # # 使用自定义订阅频道
  218. # eth_subscriptions = ["spot@public.deals.v3.api@ETHUSDT"]
  219. # eth_client = MexcWSClient(
  220. # symbol="ETHUSDT",
  221. # on_message_callback=my_eth_message_handler,
  222. # subscriptions=eth_subscriptions
  223. # )
  224. # active_clients.append(eth_client)
  225. # eth_client.start()
  226. # 主程序保持运行,直到接收到关闭信号
  227. try:
  228. while True:
  229. all_stopped = True
  230. for client in active_clients:
  231. if client.main_thread and client.main_thread.is_alive():
  232. all_stopped = False
  233. break
  234. if all_stopped and any(client.user_initiated_close for client in active_clients): # 确保是用户主动关闭
  235. print("All clients have stopped. Exiting main loop.")
  236. break
  237. time.sleep(1) # 避免CPU空转
  238. except KeyboardInterrupt: # 理论上 signal_handler会处理,这里作为后备
  239. print("Main loop interrupted by KeyboardInterrupt.")
  240. finally:
  241. print("Cleaning up remaining client resources (if any)...")
  242. for client in active_clients:
  243. if not client.user_initiated_close: # 如果某个客户端不是由信号处理器停止的
  244. client.stop() # 确保停止
  245. client.join(timeout=5) # 最后尝试join
  246. print("Application shut down.")