data_processing.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import json
  2. import pandas as pd
  3. import time
  4. import queue
  5. import threading
  6. from logger_config import logger
  7. # Initialize the DataFrame
  8. df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
  9. df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
  10. previous_order_book = None
  11. fill_probabilities = {}
  12. order_disappearances = {}
  13. order_executions = {}
  14. last_trade = {'price': None, 'qty': None, 'side': None}
  15. messages = queue.Queue() # 创建一个线程安全队列
  16. stop_event = threading.Event()
  17. def on_message_trade(_ws, message):
  18. global df_trades, order_executions, last_trade
  19. json_message = json.loads(message)
  20. trade = {
  21. 'price': float(json_message['data']['p']),
  22. 'qty': float(json_message['data']['q']),
  23. 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
  24. 'side': 'buy' if json_message['data']['m'] else 'sell' # 'm' indicates是否买方是做市商
  25. }
  26. trade_df = pd.DataFrame([trade])
  27. if not trade_df.empty and not trade_df.isna().all().all():
  28. df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
  29. # 记录每个价格的实际成交总量
  30. price = trade['price']
  31. last_trade = {'price': price, 'qty': trade['qty'], 'side': trade['side']}
  32. if price not in order_executions:
  33. order_executions[price] = 0
  34. order_executions[price] += trade['qty']
  35. calculate_fill_probabilities()
  36. def on_message_depth(_ws, message):
  37. global df_order_book, order_disappearances, previous_order_book
  38. json_message = json.loads(message)
  39. bids = json_message['data']['b'][:10] # Top 10 bids
  40. asks = json_message['data']['a'][:10] # Top 10 asks
  41. order_book = {
  42. 'bid_price': [float(bid[0]) for bid in bids],
  43. 'bid_qty': [float(bid[1]) for bid in bids],
  44. 'ask_price': [float(ask[0]) for ask in asks],
  45. 'ask_qty': [float(ask[1]) for ask in asks]
  46. }
  47. df_order_book = pd.DataFrame([order_book])
  48. if previous_order_book is not None:
  49. # 计算订单消失量
  50. for level in range(10):
  51. bid_price = df_order_book['bid_price'].iloc[0][level]
  52. ask_price = df_order_book['ask_price'].iloc[0][level]
  53. bid_qty = df_order_book['bid_qty'].iloc[0][level]
  54. ask_qty = df_order_book['ask_qty'].iloc[0][level]
  55. prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
  56. prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
  57. # 计算bid订单消失量
  58. if bid_price not in order_disappearances:
  59. order_disappearances[bid_price] = 0
  60. if prev_bid_qty > bid_qty:
  61. disappearances = (prev_bid_qty - bid_qty)
  62. order_disappearances[bid_price] += disappearances if disappearances > 0 else 0
  63. # 计算ask订单消失量
  64. if ask_price not in order_disappearances:
  65. order_disappearances[ask_price] = 0
  66. if prev_ask_qty > ask_qty:
  67. disappearances = (prev_ask_qty - ask_qty)
  68. order_disappearances[ask_price] += disappearances if disappearances > 0 else 0
  69. previous_order_book = df_order_book
  70. calculate_fill_probabilities()
  71. # 计算成交概率
  72. def calculate_fill_probabilities():
  73. global order_executions, order_disappearances, fill_probabilities, df_order_book
  74. for price in order_disappearances:
  75. if price in order_executions:
  76. disappearances = order_disappearances[price]
  77. executions = order_executions[price]
  78. # 确保成交概率不大于1
  79. fill_probabilities[price] = executions / disappearances if disappearances > 0 else 0
  80. else:
  81. fill_probabilities[price] = 0
  82. if fill_probabilities and not df_order_book.empty and last_trade['price'] is not None:
  83. last_price = last_trade['price']
  84. asks = [[price, fill_probabilities[price]] for price in fill_probabilities.keys() if
  85. price > last_price and price in fill_probabilities]
  86. bids = [[price, fill_probabilities[price]] for price in fill_probabilities.keys() if
  87. price < last_price and price in fill_probabilities]
  88. asks_sorted = sorted(asks, key=lambda x: x[0])
  89. bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True)
  90. # last_qty = last_trade['qty']
  91. last_qty = 0
  92. side = last_trade['side']
  93. data = {
  94. "asks": asks_sorted,
  95. "bids": bids_sorted,
  96. "last_price": last_price,
  97. "last_qty": last_qty,
  98. "side": side,
  99. "time": int(time.time() * 1000)
  100. }
  101. messages.put(data)