binance_order_flow.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import threading
  2. import websocket
  3. import json
  4. import pandas as pd
  5. import warnings
  6. import logging
  7. import colorlog
  8. # 忽略 FutureWarning
  9. warnings.simplefilter(action='ignore', category=FutureWarning)
  10. # 配置日志
  11. handler = colorlog.StreamHandler()
  12. handler.setFormatter(colorlog.ColoredFormatter(
  13. "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
  14. datefmt=None,
  15. reset=True,
  16. log_colors={
  17. 'DEBUG': 'cyan',
  18. 'INFO': 'blue',
  19. 'WARNING': 'yellow',
  20. 'ERROR': 'red',
  21. 'CRITICAL': 'bold_red',
  22. }
  23. ))
  24. logger = logging.getLogger("market_monitor")
  25. logger.setLevel(logging.INFO)
  26. logger.addHandler(handler)
  27. # Binance WebSocket API URL
  28. SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
  29. SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
  30. # Initialize the DataFrame
  31. df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
  32. df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
  33. previous_order_book = None
  34. fill_probabilities = {}
  35. order_disappearances = {}
  36. order_executions = {}
  37. last_trade_price = None
  38. def on_message_trade(_ws, message):
  39. global df_trades, order_executions, last_trade_price
  40. json_message = json.loads(message)
  41. trade = {
  42. 'price': float(json_message['p']),
  43. 'qty': float(json_message['q']),
  44. 'timestamp': pd.to_datetime(json_message['T'], unit='ms')
  45. }
  46. trade_df = pd.DataFrame([trade])
  47. if not trade_df.empty and not trade_df.isna().all().all():
  48. df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
  49. # 记录每个价格的实际成交总量
  50. price = trade['price']
  51. last_trade_price = price
  52. if price not in order_executions:
  53. order_executions[price] = 0
  54. order_executions[price] += trade['qty']
  55. def on_message_depth(_ws, message):
  56. global df_order_book, order_disappearances, previous_order_book
  57. json_message = json.loads(message)
  58. bids = json_message['bids'][:10] # Top 10 bids
  59. asks = json_message['asks'][:10] # Top 10 asks
  60. order_book = {
  61. 'bid_price': [float(bid[0]) for bid in bids],
  62. 'bid_qty': [float(bid[1]) for bid in bids],
  63. 'ask_price': [float(ask[0]) for ask in asks],
  64. 'ask_qty': [float(ask[1]) for ask in asks]
  65. }
  66. current_order_book = pd.DataFrame([order_book])
  67. if previous_order_book is not None:
  68. # 计算订单消失量
  69. for level in range(10):
  70. bid_price = current_order_book['bid_price'].iloc[0][level]
  71. ask_price = current_order_book['ask_price'].iloc[0][level]
  72. bid_qty = current_order_book['bid_qty'].iloc[0][level]
  73. ask_qty = current_order_book['ask_qty'].iloc[0][level]
  74. prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
  75. prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
  76. # 计算bid订单消失量
  77. if bid_price not in order_disappearances:
  78. order_disappearances[bid_price] = 0
  79. if prev_bid_qty > bid_qty:
  80. order_disappearances[bid_price] += (prev_bid_qty - bid_qty)
  81. # 计算ask订单消失量
  82. if ask_price not in order_disappearances:
  83. order_disappearances[ask_price] = 0
  84. if prev_ask_qty > ask_qty:
  85. order_disappearances[ask_price] += (prev_ask_qty - ask_qty)
  86. previous_order_book = current_order_book
  87. def on_error(_ws, error):
  88. logger.error(error)
  89. def on_open(_ws):
  90. print("### opened ###")
  91. # Create a WebSocket app
  92. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
  93. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
  94. # 定义要传递给 run_forever 的参数
  95. http_proxy_host = "127.0.0.1"
  96. http_proxy_port = 7890
  97. proxy_type = "http"
  98. # Run the WebSocket with proxy settings
  99. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  100. 'http_proxy_host': http_proxy_host,
  101. 'http_proxy_port': http_proxy_port,
  102. 'proxy_type': proxy_type
  103. })
  104. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  105. 'http_proxy_host': http_proxy_host,
  106. 'http_proxy_port': http_proxy_port,
  107. 'proxy_type': proxy_type
  108. })
  109. trade_thread.start()
  110. depth_thread.start()
  111. stop_event = threading.Event()
  112. # Function to calculate fill probabilities
  113. def calculate_fill_probabilities():
  114. global order_executions, order_disappearances, fill_probabilities
  115. fill_probabilities = {}
  116. for price in order_disappearances:
  117. if price in order_executions:
  118. disappearances = order_disappearances[price]
  119. executions = order_executions[price]
  120. # 确保成交概率不大于1
  121. fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0
  122. else:
  123. fill_probabilities[price] = 0
  124. # Function to periodically log fill probabilities
  125. def log_fill_probabilities_periodically():
  126. while not stop_event.is_set():
  127. calculate_fill_probabilities()
  128. if fill_probabilities:
  129. logger.info("Fill Probabilities:\n%s", repr(fill_probabilities))
  130. stop_event.wait(5) # 每5秒打印一次
  131. # 启动定期打印线程
  132. log_fill_probabilities_thread = threading.Thread(target=log_fill_probabilities_periodically)
  133. log_fill_probabilities_thread.start()
  134. def stop_all_threads():
  135. stop_event.set()
  136. trade_thread.join()
  137. depth_thread.join()
  138. log_fill_probabilities_thread.join()
  139. # 停止所有线程(在需要停止时调用)
  140. # stop_all_threads()