data_processing.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import json
  2. import scipy.stats as stats
  3. import numpy as np
  4. import pandas as pd
  5. import queue
  6. import threading
  7. from collections import deque
  8. from logger_config import logger
  9. # 假设我们有一个数据流,订单簿和成交数据
  10. order_book_snapshots = deque(maxlen=100) # 存储过去100ms的订单簿快照
  11. # 数据积累的阈值
  12. DATA_THRESHOLD = 20
  13. messages = queue.Queue() # 创建一个线程安全队列
  14. stop_event = threading.Event()
  15. # 计算kappa的参数
  16. gamma = 0.1 # 库存风险厌恶参数
  17. sigma = 0.02 # 市场波动率
  18. T_minus_t = 1 # 交易会话的剩余时间
  19. # def on_message_trade(_ws, message):
  20. # global trade_data
  21. # json_message = json.loads(message)
  22. # trade = {
  23. # 'price': float(json_message['data']['p']),
  24. # 'qty': float(json_message['data']['q']),
  25. # 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
  26. # 'side': 'sell' if json_message['data']['m'] else 'buy'
  27. # }
  28. # trade_data.append(trade)
  29. # predict_market_direction()
  30. def on_message_depth(_ws, message):
  31. global order_book_snapshots
  32. json_message = json.loads(message)
  33. bids = json_message['data']['b'][:10] # Top 10 bids
  34. asks = json_message['data']['a'][:10] # Top 10 asks
  35. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  36. order_book_snapshots.append({
  37. 'bids': bids,
  38. 'asks': asks,
  39. 'timestamp': timestamp
  40. })
  41. process_depth_data(bids, asks)
  42. def process_depth_data(bids, asks):
  43. # 提取订单大小数据
  44. order_sizes = []
  45. for bid in bids:
  46. order_sizes.append(float(bid[1]))
  47. for ask in asks:
  48. order_sizes.append(float(ask[1]))
  49. if len(order_sizes) < 2: # 确保有足够的数据进行计算
  50. logger.warning("Not enough data to calculate kappa.")
  51. return
  52. # 使用 MLE 估计幂律指数 β
  53. Q_min = np.min(order_sizes)
  54. n = len(order_sizes)
  55. try:
  56. beta_hat = 1 + n / np.sum(np.log(order_sizes / Q_min))
  57. except ZeroDivisionError:
  58. logger.error("ZeroDivisionError encountered in MLE calculation.")
  59. return
  60. # 使用最小二乘法拟合幂律分布
  61. log_order_sizes = np.log(order_sizes)
  62. hist, bin_edges = np.histogram(log_order_sizes, bins=50, density=True)
  63. bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
  64. # 过滤掉频率为零的bin
  65. non_zero_indices = hist > 0
  66. hist = hist[non_zero_indices]
  67. bin_centers = bin_centers[non_zero_indices]
  68. if len(hist) == 0 or len(bin_centers) == 0:
  69. logger.warning("No non-zero bins available for linear regression.")
  70. return
  71. try:
  72. slope, intercept, r_value, p_value, std_err = stats.linregress(bin_centers, np.log(hist))
  73. beta_lsm = -slope
  74. except ValueError as e:
  75. logger.error(f"ValueError encountered in linear regression: {e}")
  76. return
  77. # 计算kappa值
  78. k = beta_hat # 这里假设我们使用MLE估计的β值作为k
  79. logger.info(f"Estimated kappa: {k}")
  80. # kappa = (2 / (gamma * sigma**2 * T_minus_t)) * np.log(1 + (gamma / k))
  81. # logger.info(f"Estimated Kappa: {kappa}")