data_processing.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import json
  2. import pandas as pd
  3. import queue
  4. import threading
  5. from collections import deque
  6. from scipy.integrate import trapz
  7. import numpy as np
  8. from logger_config import logger
  9. # 假设我们有一个数据流,订单簿和成交数据
  10. order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照
  11. trade_snapshots = deque(maxlen=6000) # 存储过去6000个成交数据
  12. stop_event = threading.Event()
  13. # 初始参数
  14. k_initial = 0.5
  15. A_initial = 1.0
  16. # 假设S0是初始的参考价格
  17. S0 = -1
  18. def on_message_trade(_ws, message):
  19. global trade_snapshots
  20. json_message = json.loads(message)
  21. trade = {
  22. 'price': float(json_message['data']['p']),
  23. 'qty': float(json_message['data']['q']),
  24. 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
  25. 'side': 'sell' if json_message['data']['m'] else 'buy'
  26. }
  27. trade_snapshots.append(trade)
  28. process_depth_data()
  29. def on_message_depth(_ws, message):
  30. global order_book_snapshots
  31. json_message = json.loads(message)
  32. bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
  33. asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
  34. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  35. order_book_snapshots.append({
  36. 'bids': bids,
  37. 'asks': asks,
  38. 'timestamp': timestamp
  39. })
  40. process_depth_data()
  41. def calculate_phi(prices, k, S0):
  42. """
  43. 计算 φ(k, ξ) 的值
  44. :param prices: 时间序列的价格数据
  45. :param k: 参数 k
  46. :param S0: 初始价格
  47. :return: φ(k, ξ) 的值
  48. """
  49. price_changes = np.array(prices) - S0
  50. exponentials = np.exp(k * price_changes)
  51. phi = np.mean(exponentials)
  52. return phi
  53. def calculate_integral_phi(prices, k, S0, time_points):
  54. """
  55. 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
  56. :param prices: 时间序列的价格数据
  57. :param k: 参数 k
  58. :param S0: 初始价格
  59. :param time_points: 时间点数组
  60. :return: ∫ φ(k, ξ) dξ 的值
  61. """
  62. # 计算每个时间点的 φ(k, ξ) 值
  63. phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
  64. # 使用梯形法计算积分
  65. integral_phi = trapz(phi_values, time_points)
  66. return integral_phi
  67. def estimate_lambda(waiting_times, T):
  68. """
  69. 通过等待时间估计 λ(δ)
  70. :param waiting_times: 等待时间的数组
  71. :param T: 时间窗口的大小
  72. :return: λ(δ) 的估计值
  73. """
  74. # 将 waiting_times 转换为 NumPy 数组
  75. waiting_times = np.array(waiting_times)
  76. sum_indicator = np.sum(waiting_times < T)
  77. sum_waiting_times = int(np.sum(waiting_times).total_seconds() * 1000)
  78. lambda_hat = sum_indicator / sum_waiting_times
  79. return lambda_hat
  80. def process_depth_data():
  81. global order_book_snapshots, trade_snapshots
  82. # 数据预热,至少10条深度数据以及100条成交数据才能用于计算
  83. if len(order_book_snapshots) < 10 and len(trade_snapshots) < 100:
  84. return
  85. global k_initial, A_initial, S0
  86. S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
  87. if S0 < 0:
  88. S0 = S_values[0]
  89. # ========================= 计算 log(∫ φ(k, ξ) dξ) ==================
  90. # 提取时间戳并计算时间间隔
  91. order_book_timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
  92. order_book_time_points = [(timestamp - order_book_timestamps[0]).total_seconds() for timestamp in order_book_timestamps]
  93. # 计算 ∫ φ(k, ξ) dξ
  94. integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
  95. # 计算 log(∫ φ(k, ξ) dξ)
  96. log_integral_phi_value = np.log(integral_phi_value)
  97. # ========================== 估计 λ(δ) ==============================
  98. # 计算等待时间
  99. trade_timestamps = [snapshot['timestamp'] for snapshot in trade_snapshots]
  100. waiting_times = [trade_timestamps[i] - trade_timestamps[i - 1] for i in range(1, len(trade_timestamps))]
  101. # 时间窗口的大小
  102. T = pd.to_datetime(100, unit='ms') - pd.to_datetime(0, unit='ms')
  103. lambda_hat = estimate_lambda(waiting_times, T)
  104. # logger.info("λ(δ) 的值: " + str(lambda_hat) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
  105. log_lambda_hat_value = np.log(lambda_hat)
  106. logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))