data_processing.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json
  2. import pandas as pd
  3. import threading
  4. from collections import deque
  5. from scipy.integrate import trapz
  6. import numpy as np
  7. from logger_config import logger
  8. # 假设我们有一个数据流,订单簿和成交数据
  9. order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照
  10. trade_snapshots = deque(maxlen=6000) # 存储过去6000个成交数据
  11. stop_event = threading.Event()
  12. # 初始参数
  13. k_initial = 0.5
  14. A_initial = 1.0
  15. # 假设S0是初始的参考价格
  16. S0 = -1
  17. def on_message_trade(_ws, message):
  18. global trade_snapshots
  19. json_message = json.loads(message)
  20. trade = {
  21. 'price': float(json_message['data']['p']),
  22. 'qty': float(json_message['data']['q']),
  23. 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
  24. 'side': 'sell' if json_message['data']['m'] else 'buy'
  25. }
  26. trade_snapshots.append(trade)
  27. process_depth_data()
  28. def on_message_depth(_ws, message):
  29. global order_book_snapshots
  30. json_message = json.loads(message)
  31. bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
  32. asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
  33. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  34. order_book_snapshots.append({
  35. 'bids': bids,
  36. 'asks': asks,
  37. 'timestamp': timestamp
  38. })
  39. process_depth_data()
  40. def calculate_phi(prices, k, S0):
  41. """
  42. 计算 φ(k, ξ) 的值
  43. :param prices: 时间序列的价格数据
  44. :param k: 参数 k
  45. :param S0: 初始价格
  46. :return: φ(k, ξ) 的值
  47. """
  48. price_changes = np.array(prices) - S0
  49. exponentials = np.exp(k * price_changes)
  50. phi = np.mean(exponentials)
  51. return phi
  52. def calculate_integral_phi(prices, k, S0, time_points):
  53. """
  54. 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
  55. :param prices: 时间序列的价格数据
  56. :param k: 参数 k
  57. :param S0: 初始价格
  58. :param time_points: 时间点数组
  59. :return: ∫ φ(k, ξ) dξ 的值
  60. """
  61. # 计算每个时间点的 φ(k, ξ) 值
  62. phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
  63. # 使用梯形法计算积分
  64. integral_phi = trapz(phi_values, time_points)
  65. return integral_phi
  66. def estimate_lambda(waiting_times, T):
  67. """
  68. 通过等待时间估计 λ(δ)
  69. :param waiting_times: 等待时间的数组
  70. :param T: 时间窗口的大小
  71. :return: λ(δ) 的估计值
  72. """
  73. # 将 waiting_times 转换为 NumPy 数组
  74. waiting_times = np.array(waiting_times)
  75. sum_indicator = np.sum(waiting_times < T)
  76. sum_waiting_times = int(np.sum(waiting_times).total_seconds() * 1000)
  77. lambda_hat = sum_indicator / sum_waiting_times
  78. return lambda_hat
  79. def process_depth_data():
  80. global order_book_snapshots, trade_snapshots
  81. # 数据预热,至少10条深度数据以及100条成交数据才能用于计算
  82. if len(order_book_snapshots) < 10 and len(trade_snapshots) < 100:
  83. return
  84. global k_initial, A_initial, S0
  85. S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
  86. if S0 < 0:
  87. S0 = S_values[0]
  88. # ========================= 计算 log(∫ φ(k, ξ) dξ) ==================
  89. # 提取时间戳并计算时间间隔
  90. order_book_timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
  91. order_book_time_points = [(timestamp - order_book_timestamps[0]).total_seconds() for timestamp in order_book_timestamps]
  92. # 计算 ∫ φ(k, ξ) dξ
  93. integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
  94. # 计算 log(∫ φ(k, ξ) dξ)
  95. log_integral_phi_value = np.log(integral_phi_value)
  96. # ========================== 估计 λ(δ) ==============================
  97. # 计算等待时间
  98. trade_timestamps = [snapshot['timestamp'] for snapshot in trade_snapshots]
  99. waiting_times = [trade_timestamps[i] - trade_timestamps[i - 1] for i in range(1, len(trade_timestamps))]
  100. # 时间窗口的大小
  101. T = pd.to_datetime(100, unit='ms') - pd.to_datetime(0, unit='ms')
  102. lambda_hat = estimate_lambda(waiting_times, T)
  103. # logger.info("λ(δ) 的值: " + str(lambda_hat) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
  104. log_lambda_hat_value = np.log(lambda_hat)
  105. logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))