binance_order_flow.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import threading
  2. import websocket
  3. import json
  4. import pandas as pd
  5. import numpy as np
  6. import time
  7. import warnings
  8. import logging
  9. import colorlog
  10. # 忽略 FutureWarning
  11. warnings.simplefilter(action='ignore', category=FutureWarning)
  12. # 配置日志
  13. handler = colorlog.StreamHandler()
  14. handler.setFormatter(colorlog.ColoredFormatter(
  15. "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
  16. datefmt=None,
  17. reset=True,
  18. log_colors={
  19. 'DEBUG': 'cyan',
  20. 'INFO': 'blue',
  21. 'WARNING': 'yellow',
  22. 'ERROR': 'red',
  23. 'CRITICAL': 'bold_red',
  24. }
  25. ))
  26. logger = logging.getLogger("market_monitor")
  27. logger.setLevel(logging.INFO)
  28. logger.addHandler(handler)
  29. # Binance WebSocket API URL
  30. SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
  31. SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
  32. # Initialize the DataFrame
  33. df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
  34. df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
  35. previous_order_book = None
  36. fill_probabilities = {}
  37. order_disappearances = {}
  38. order_executions = {}
  39. def on_message_trade(_ws, message):
  40. global df_trades, order_executions
  41. json_message = json.loads(message)
  42. trade = {
  43. 'price': float(json_message['p']),
  44. 'qty': float(json_message['q']),
  45. 'timestamp': pd.to_datetime(json_message['T'], unit='ms')
  46. }
  47. trade_df = pd.DataFrame([trade])
  48. if not trade_df.empty and not trade_df.isna().all().all():
  49. df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
  50. # 记录每个价格的实际成交总量
  51. price = trade['price']
  52. if price not in order_executions:
  53. order_executions[price] = 0
  54. order_executions[price] += trade['qty']
  55. def on_message_depth(_ws, message):
  56. global df_order_book, order_disappearances, previous_order_book
  57. json_message = json.loads(message)
  58. bids = json_message['bids'][:10] # Top 10 bids
  59. asks = json_message['asks'][:10] # Top 10 asks
  60. order_book = {
  61. 'bid_price': [float(bid[0]) for bid in bids],
  62. 'bid_qty': [float(bid[1]) for bid in bids],
  63. 'ask_price': [float(ask[0]) for ask in asks],
  64. 'ask_qty': [float(ask[1]) for ask in asks]
  65. }
  66. current_order_book = pd.DataFrame([order_book])
  67. if previous_order_book is not None:
  68. # 计算订单消失量
  69. for level in range(10):
  70. bid_price = current_order_book['bid_price'].iloc[0][level]
  71. ask_price = current_order_book['ask_price'].iloc[0][level]
  72. bid_qty = current_order_book['bid_qty'].iloc[0][level]
  73. ask_qty = current_order_book['ask_qty'].iloc[0][level]
  74. prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
  75. prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
  76. # 计算bid订单消失量
  77. if bid_price not in order_disappearances:
  78. order_disappearances[bid_price] = 0
  79. if prev_bid_qty > bid_qty:
  80. order_disappearances[bid_price] += (prev_bid_qty - bid_qty)
  81. # 计算ask订单消失量
  82. if ask_price not in order_disappearances:
  83. order_disappearances[ask_price] = 0
  84. if prev_ask_qty > ask_qty:
  85. order_disappearances[ask_price] += (prev_ask_qty - ask_qty)
  86. previous_order_book = current_order_book
  87. def on_error(_ws, error):
  88. logger.error(error)
  89. def on_open(_ws):
  90. print("### opened ###")
  91. # Create a WebSocket app
  92. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
  93. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
  94. # 定义要传递给 run_forever 的参数
  95. http_proxy_host = "127.0.0.1"
  96. http_proxy_port = 7890
  97. proxy_type = "http"
  98. # Run the WebSocket with proxy settings
  99. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  100. 'http_proxy_host': http_proxy_host,
  101. 'http_proxy_port': http_proxy_port,
  102. 'proxy_type': proxy_type
  103. })
  104. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  105. 'http_proxy_host': http_proxy_host,
  106. 'http_proxy_port': http_proxy_port,
  107. 'proxy_type': proxy_type
  108. })
  109. trade_thread.start()
  110. depth_thread.start()
  111. stop_event = threading.Event()
  112. # Function to calculate fill probabilities
  113. def calculate_fill_probabilities():
  114. global order_executions, order_disappearances, fill_probabilities
  115. fill_probabilities = {}
  116. for price in order_disappearances:
  117. if price in order_executions:
  118. disappearances = order_disappearances[price]
  119. executions = order_executions[price]
  120. # 确保成交概率不大于1
  121. fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0
  122. else:
  123. fill_probabilities[price] = 0
  124. # Function to periodically log fill probabilities
  125. def log_fill_probabilities_periodically():
  126. while not stop_event.is_set():
  127. calculate_fill_probabilities()
  128. if fill_probabilities:
  129. logger.info("Fill Probabilities:\n%s", repr(fill_probabilities))
  130. stop_event.wait(5) # 每5秒打印一次
  131. # 启动定期打印线程
  132. log_fill_probabilities_thread = threading.Thread(target=log_fill_probabilities_periodically)
  133. log_fill_probabilities_thread.start()
  134. def stop_all_threads():
  135. stop_event.set()
  136. trade_thread.join()
  137. depth_thread.join()
  138. log_fill_probabilities_thread.join()
  139. # 停止所有线程(在需要停止时调用)
  140. # stop_all_threads()