binance_order_flow.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import asyncio
  2. import queue
  3. import threading
  4. import time
  5. import websocket
  6. import json
  7. import pandas as pd
  8. import warnings
  9. import logging
  10. import colorlog
  11. import websockets
  12. # 忽略 FutureWarning
  13. warnings.simplefilter(action='ignore', category=FutureWarning)
  14. # 配置日志
  15. handler = colorlog.StreamHandler()
  16. handler.setFormatter(colorlog.ColoredFormatter(
  17. "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
  18. datefmt=None,
  19. reset=True,
  20. log_colors={
  21. 'DEBUG': 'cyan',
  22. 'INFO': 'blue',
  23. 'WARNING': 'yellow',
  24. 'ERROR': 'red',
  25. 'CRITICAL': 'bold_red',
  26. }
  27. ))
  28. logger = logging.getLogger("market_monitor")
  29. logger.setLevel(logging.INFO)
  30. logger.addHandler(handler)
  31. # Binance WebSocket API URL
  32. SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
  33. SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
  34. # Initialize the DataFrame
  35. df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
  36. df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
  37. previous_order_book = None
  38. fill_probabilities = {}
  39. order_disappearances = {}
  40. order_executions = {}
  41. last_trade = {'price': None, 'qty': None, 'side': None}
  42. messages = queue.Queue() # 创建一个线程安全队列
  43. def on_message_trade(_ws, message):
  44. global df_trades, order_executions, last_trade
  45. json_message = json.loads(message)
  46. trade = {
  47. 'price': float(json_message['p']),
  48. 'qty': float(json_message['q']),
  49. 'timestamp': pd.to_datetime(json_message['T'], unit='ms'),
  50. 'side': 'buy' if json_message['m'] else 'sell' # 'm' indicates whether the buyer is the market maker
  51. }
  52. trade_df = pd.DataFrame([trade])
  53. if not trade_df.empty and not trade_df.isna().all().all():
  54. df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
  55. # 记录每个价格的实际成交总量
  56. price = trade['price']
  57. last_trade = {'price': price, 'qty': trade['qty'], 'side': trade['side']}
  58. if price not in order_executions:
  59. order_executions[price] = 0
  60. order_executions[price] += trade['qty']
  61. calculate_fill_probabilities()
  62. def on_message_depth(_ws, message):
  63. global df_order_book, order_disappearances, previous_order_book
  64. json_message = json.loads(message)
  65. bids = json_message['bids'][:10] # Top 10 bids
  66. asks = json_message['asks'][:10] # Top 10 asks
  67. order_book = {
  68. 'bid_price': [float(bid[0]) for bid in bids],
  69. 'bid_qty': [float(bid[1]) for bid in bids],
  70. 'ask_price': [float(ask[0]) for ask in asks],
  71. 'ask_qty': [float(ask[1]) for ask in asks]
  72. }
  73. df_order_book = pd.DataFrame([order_book])
  74. if previous_order_book is not None:
  75. # 计算订单消失量
  76. for level in range(10):
  77. bid_price = df_order_book['bid_price'].iloc[0][level]
  78. ask_price = df_order_book['ask_price'].iloc[0][level]
  79. bid_qty = df_order_book['bid_qty'].iloc[0][level]
  80. ask_qty = df_order_book['ask_qty'].iloc[0][level]
  81. prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
  82. prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
  83. # 计算bid订单消失量
  84. if bid_price not in order_disappearances:
  85. order_disappearances[bid_price] = 0
  86. if prev_bid_qty > bid_qty:
  87. order_disappearances[bid_price] += (prev_bid_qty - bid_qty)
  88. # 计算ask订单消失量
  89. if ask_price not in order_disappearances:
  90. order_disappearances[ask_price] = 0
  91. if prev_ask_qty > ask_qty:
  92. order_disappearances[ask_price] += (prev_ask_qty - ask_qty)
  93. previous_order_book = df_order_book
  94. calculate_fill_probabilities()
  95. def on_error(_ws, error):
  96. logger.error(error)
  97. def on_open(_ws):
  98. print("### opened ###")
  99. # Create a WebSocket app
  100. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
  101. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
  102. # 定义要传递给 run_forever 的参数
  103. http_proxy_host = "127.0.0.1"
  104. http_proxy_port = 7890
  105. proxy_type = "http"
  106. # Run the WebSocket with proxy settings
  107. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  108. 'http_proxy_host': http_proxy_host,
  109. 'http_proxy_port': http_proxy_port,
  110. 'proxy_type': proxy_type
  111. })
  112. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  113. 'http_proxy_host': http_proxy_host,
  114. 'http_proxy_port': http_proxy_port,
  115. 'proxy_type': proxy_type
  116. })
  117. trade_thread.start()
  118. depth_thread.start()
  119. stop_event = threading.Event()
  120. # Function to calculate fill probabilities
  121. def calculate_fill_probabilities():
  122. global order_executions, order_disappearances, fill_probabilities
  123. fill_probabilities = {}
  124. for price in order_disappearances:
  125. if price in order_executions:
  126. disappearances = order_disappearances[price]
  127. executions = order_executions[price]
  128. # 确保成交概率不大于1
  129. fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0
  130. else:
  131. fill_probabilities[price] = 0
  132. # Function to periodically log and save fill probabilities
  133. def log_and_save_fill_probabilities():
  134. global df_order_book, fill_probabilities
  135. while not stop_event.is_set():
  136. if fill_probabilities and not df_order_book.empty:
  137. asks = [[price, fill_probabilities[price]] for price in df_order_book['ask_price'].iloc[0] if price in fill_probabilities]
  138. bids = [[price, fill_probabilities[price]] for price in df_order_book['bid_price'].iloc[0] if price in fill_probabilities]
  139. last_price = last_trade['price']
  140. last_qty = last_trade['qty']
  141. side = last_trade['side']
  142. data = {
  143. "asks": asks,
  144. "bids": bids,
  145. "last_price": last_price,
  146. "last_qty": last_qty,
  147. "side": side,
  148. "time": int(time.time() * 1000)
  149. }
  150. messages.put(data)
  151. # logger.info("Market Snapshot:\n%s", json.dumps(data))
  152. stop_event.wait(0.1) # 每5秒更新一次
  153. # 启动定期保存和打印线程
  154. log_and_save_thread = threading.Thread(target=log_and_save_fill_probabilities)
  155. log_and_save_thread.start()
  156. async def ws_inited(ws, path):
  157. logger.info("客户端已连接上,ws://localhost:6789初始化完毕")
  158. while True:
  159. message = await asyncio.get_running_loop().run_in_executor(None, messages.get)
  160. # 检查连接是否仍然开放
  161. if ws.open:
  162. message_data = json.dumps(message) # 将字典序列化为JSON字符串
  163. await ws.send(message_data)
  164. else:
  165. logger.info("WebSocket 连接已关闭")
  166. break
  167. async def start_websocket_server():
  168. server = await websockets.serve(ws_inited, "localhost", 6789)
  169. logger.info("WebSocket 服务器启动在 ws://localhost:6789")
  170. await server.wait_closed()
  171. def stop_all_threads():
  172. stop_event.set()
  173. trade_thread.join()
  174. depth_thread.join()
  175. log_and_save_thread.join()
  176. # 启动 WebSocket 服务器
  177. asyncio.run(start_websocket_server())
  178. # 停止所有线程(在需要停止时调用)
  179. # stop_all_threads()