data_processing.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import json
  2. import scipy.stats as stats
  3. import numpy as np
  4. import pandas as pd
  5. import queue
  6. import threading
  7. from collections import deque
  8. from logger_config import logger
  9. # 假设我们有一个数据流,订单簿和成交数据
  10. order_book_snapshots = deque(maxlen=1000) # 存储过去10000个订单簿快照
  11. # 数据积累的阈值
  12. DATA_THRESHOLD = 20
  13. messages = queue.Queue() # 创建一个线程安全队列
  14. stop_event = threading.Event()
  15. def on_message_depth(_ws, message):
  16. global order_book_snapshots
  17. json_message = json.loads(message)
  18. bids = json_message['data']['b'][:10] # Top 10 bids
  19. asks = json_message['data']['a'][:10] # Top 10 asks
  20. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  21. order_book_snapshots.append({
  22. 'bids': bids,
  23. 'asks': asks,
  24. 'timestamp': timestamp
  25. })
  26. if len(order_book_snapshots) >= DATA_THRESHOLD:
  27. process_depth_data(order_book_snapshots)
  28. def process_depth_data(order_book_snapshots):
  29. # 提取订单大小数据
  30. order_sizes = []
  31. for snapshot in order_book_snapshots:
  32. for bid in snapshot['bids']:
  33. order_sizes.append(float(bid[1]))
  34. for ask in snapshot['asks']:
  35. order_sizes.append(float(ask[1]))
  36. if len(order_sizes) < 2: # 确保有足够的数据进行计算
  37. logger.warning("Not enough data to calculate kappa.")
  38. return
  39. # 使用 MLE 估计幂律指数 β
  40. Q_min = np.min(order_sizes)
  41. n = len(order_sizes)
  42. try:
  43. beta_hat = 1 + n / np.sum(np.log(order_sizes / Q_min))
  44. except ZeroDivisionError:
  45. logger.error("ZeroDivisionError encountered in MLE calculation.")
  46. return
  47. # 使用最小二乘法拟合幂律分布
  48. log_order_sizes = np.log(order_sizes)
  49. hist, bin_edges = np.histogram(log_order_sizes, bins=50, density=True)
  50. bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
  51. # 过滤掉频率为零的bin
  52. non_zero_indices = hist > 0
  53. hist = hist[non_zero_indices]
  54. bin_centers = bin_centers[non_zero_indices]
  55. if len(hist) == 0 or len(bin_centers) == 0:
  56. logger.warning("No non-zero bins available for linear regression.")
  57. return
  58. try:
  59. slope, intercept, r_value, p_value, std_err = stats.linregress(bin_centers, np.log(hist))
  60. beta_lsm = -slope
  61. except ValueError as e:
  62. logger.error(f"ValueError encountered in linear regression: {e}")
  63. return
  64. # 计算kappa值
  65. gamma = 0.1 # 库存风险厌恶参数
  66. sigma = 0.02 # 市场波动率
  67. T_minus_t = 1 # 交易会话的剩余时间
  68. # 结合不同方法估算的β值,计算kappa
  69. kappa_mle = (2 / (gamma * sigma ** 2 * T_minus_t)) * np.log(1 + (gamma / beta_hat))
  70. kappa_lsm = (2 / (gamma * sigma ** 2 * T_minus_t)) * np.log(1 + (gamma / beta_lsm))
  71. # 输出结果
  72. logger.info(f"Estimated beta_hat (MLE): {beta_hat}, kappa (MLE): {kappa_mle}")
  73. logger.info(f"Estimated beta_lsm (LSM): {beta_lsm}, kappa (LSM): {kappa_lsm}")