binance_order_flow.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import asyncio
  2. import queue
  3. import threading
  4. import websocket
  5. import json
  6. import pandas as pd
  7. import warnings
  8. import logging
  9. import colorlog
  10. import websockets
  11. # 忽略 FutureWarning
  12. warnings.simplefilter(action='ignore', category=FutureWarning)
  13. # 配置日志
  14. handler = colorlog.StreamHandler()
  15. handler.setFormatter(colorlog.ColoredFormatter(
  16. "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
  17. datefmt=None,
  18. reset=True,
  19. log_colors={
  20. 'DEBUG': 'cyan',
  21. 'INFO': 'blue',
  22. 'WARNING': 'yellow',
  23. 'ERROR': 'red',
  24. 'CRITICAL': 'bold_red',
  25. }
  26. ))
  27. logger = logging.getLogger("market_monitor")
  28. logger.setLevel(logging.INFO)
  29. logger.addHandler(handler)
  30. # Binance WebSocket API URL
  31. SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
  32. SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
  33. # Initialize the DataFrame
  34. df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
  35. df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
  36. previous_order_book = None
  37. fill_probabilities = {}
  38. order_disappearances = {}
  39. order_executions = {}
  40. last_trade = {'price': None, 'qty': None, 'side': None}
  41. messages = queue.Queue() # 创建一个线程安全队列
  42. def on_message_trade(_ws, message):
  43. global df_trades, order_executions, last_trade
  44. json_message = json.loads(message)
  45. trade = {
  46. 'price': float(json_message['p']),
  47. 'qty': float(json_message['q']),
  48. 'timestamp': pd.to_datetime(json_message['T'], unit='ms'),
  49. 'side': 'buy' if json_message['m'] else 'sell' # 'm' indicates whether the buyer is the market maker
  50. }
  51. trade_df = pd.DataFrame([trade])
  52. if not trade_df.empty and not trade_df.isna().all().all():
  53. df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
  54. # 记录每个价格的实际成交总量
  55. price = trade['price']
  56. last_trade = {'price': price, 'qty': trade['qty'], 'side': trade['side']}
  57. if price not in order_executions:
  58. order_executions[price] = 0
  59. order_executions[price] += trade['qty']
  60. calculate_fill_probabilities()
  61. def on_message_depth(_ws, message):
  62. global df_order_book, order_disappearances, previous_order_book
  63. json_message = json.loads(message)
  64. bids = json_message['bids'][:10] # Top 10 bids
  65. asks = json_message['asks'][:10] # Top 10 asks
  66. order_book = {
  67. 'bid_price': [float(bid[0]) for bid in bids],
  68. 'bid_qty': [float(bid[1]) for bid in bids],
  69. 'ask_price': [float(ask[0]) for ask in asks],
  70. 'ask_qty': [float(ask[1]) for ask in asks]
  71. }
  72. df_order_book = pd.DataFrame([order_book])
  73. if previous_order_book is not None:
  74. # 计算订单消失量
  75. for level in range(10):
  76. bid_price = df_order_book['bid_price'].iloc[0][level]
  77. ask_price = df_order_book['ask_price'].iloc[0][level]
  78. bid_qty = df_order_book['bid_qty'].iloc[0][level]
  79. ask_qty = df_order_book['ask_qty'].iloc[0][level]
  80. prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
  81. prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
  82. # 计算bid订单消失量
  83. if bid_price not in order_disappearances:
  84. order_disappearances[bid_price] = 0
  85. if prev_bid_qty > bid_qty:
  86. order_disappearances[bid_price] += (prev_bid_qty - bid_qty)
  87. # 计算ask订单消失量
  88. if ask_price not in order_disappearances:
  89. order_disappearances[ask_price] = 0
  90. if prev_ask_qty > ask_qty:
  91. order_disappearances[ask_price] += (prev_ask_qty - ask_qty)
  92. previous_order_book = df_order_book
  93. calculate_fill_probabilities()
  94. def on_error(_ws, error):
  95. logger.error(error)
  96. def on_open(_ws):
  97. print("### opened ###")
  98. # Create a WebSocket app
  99. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
  100. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
  101. # 定义要传递给 run_forever 的参数
  102. http_proxy_host = "127.0.0.1"
  103. http_proxy_port = 7890
  104. proxy_type = "http"
  105. # Run the WebSocket with proxy settings
  106. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  107. 'http_proxy_host': http_proxy_host,
  108. 'http_proxy_port': http_proxy_port,
  109. 'proxy_type': proxy_type
  110. })
  111. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  112. 'http_proxy_host': http_proxy_host,
  113. 'http_proxy_port': http_proxy_port,
  114. 'proxy_type': proxy_type
  115. })
  116. trade_thread.start()
  117. depth_thread.start()
  118. stop_event = threading.Event()
  119. # Function to calculate fill probabilities
  120. def calculate_fill_probabilities():
  121. global order_executions, order_disappearances, fill_probabilities
  122. fill_probabilities = {}
  123. for price in order_disappearances:
  124. if price in order_executions:
  125. disappearances = order_disappearances[price]
  126. executions = order_executions[price]
  127. # 确保成交概率不大于1
  128. fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0
  129. else:
  130. fill_probabilities[price] = 0
  131. # Function to periodically log and save fill probabilities
  132. def log_and_save_fill_probabilities():
  133. global df_order_book, fill_probabilities
  134. while not stop_event.is_set():
  135. if fill_probabilities and not df_order_book.empty:
  136. asks = [[price, fill_probabilities[price]] for price in df_order_book['ask_price'].iloc[0] if price in fill_probabilities]
  137. bids = [[price, fill_probabilities[price]] for price in df_order_book['bid_price'].iloc[0] if price in fill_probabilities]
  138. last_price = last_trade['price']
  139. last_qty = last_trade['qty']
  140. side = last_trade['side']
  141. data = {
  142. "asks": asks,
  143. "bids": bids,
  144. "last_price": last_price,
  145. "last_qty": last_qty,
  146. "side": side
  147. }
  148. messages.put(data)
  149. # logger.info("Market Snapshot:\n%s", json.dumps(data))
  150. stop_event.wait(1) # 每5秒更新一次
  151. # 启动定期保存和打印线程
  152. log_and_save_thread = threading.Thread(target=log_and_save_fill_probabilities)
  153. log_and_save_thread.start()
  154. async def ws_inited(ws, path):
  155. logger.info("客户端已连接上,ws://localhost:6789初始化完毕")
  156. while True:
  157. message = await asyncio.get_running_loop().run_in_executor(None, messages.get)
  158. # 检查连接是否仍然开放
  159. if ws.open:
  160. message_data = json.dumps(message) # 将字典序列化为JSON字符串
  161. await ws.send(message_data)
  162. else:
  163. logger.info("WebSocket 连接已关闭")
  164. break
  165. async def start_websocket_server():
  166. server = await websockets.serve(ws_inited, "localhost", 6789)
  167. logger.info("WebSocket 服务器启动在 ws://localhost:6789")
  168. await server.wait_closed()
  169. def stop_all_threads():
  170. stop_event.set()
  171. trade_thread.join()
  172. depth_thread.join()
  173. log_and_save_thread.join()
  174. # 启动 WebSocket 服务器
  175. asyncio.run(start_websocket_server())
  176. # 停止所有线程(在需要停止时调用)
  177. # stop_all_threads()