data_processing.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import json
  2. import pandas as pd
  3. import queue
  4. import threading
  5. from collections import deque
  6. from scipy.integrate import trapz
  7. import numpy as np
  8. from logger_config import logger
  9. # 假设我们有一个数据流,订单簿和成交数据
  10. order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照
  11. # 数据积累的阈值
  12. DATA_THRESHOLD = 20
  13. messages = queue.Queue() # 创建一个线程安全队列
  14. stop_event = threading.Event()
  15. # 初始参数
  16. k_initial = 0.5
  17. A_initial = 1.0
  18. # 假设S0是初始的参考价格
  19. S0 = -1
  20. sigma = 0.2 # 假设一个波动率
  21. def on_message_depth(_ws, message):
  22. global order_book_snapshots
  23. json_message = json.loads(message)
  24. bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
  25. asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
  26. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  27. order_book_snapshots.append({
  28. 'bids': bids,
  29. 'asks': asks,
  30. 'timestamp': timestamp
  31. })
  32. if len(order_book_snapshots) >= DATA_THRESHOLD:
  33. process_depth_data(order_book_snapshots)
  34. def calculate_phi(prices, k, S0):
  35. """
  36. 计算 φ(k, ξ) 的值
  37. :param prices: 时间序列的价格数据
  38. :param k: 参数 k
  39. :param S0: 初始价格
  40. :return: φ(k, ξ) 的值
  41. """
  42. price_changes = np.array(prices) - S0
  43. exponentials = np.exp(k * price_changes)
  44. phi = np.mean(exponentials)
  45. return phi
  46. def calculate_integral_phi(prices, k, S0, time_points):
  47. """
  48. 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
  49. :param prices: 时间序列的价格数据
  50. :param k: 参数 k
  51. :param S0: 初始价格
  52. :param time_points: 时间点数组
  53. :return: ∫ φ(k, ξ) dξ 的值
  54. """
  55. # 计算每个时间点的 φ(k, ξ) 值
  56. phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
  57. # 使用梯形法计算积分
  58. integral_phi = trapz(phi_values, time_points)
  59. return integral_phi
  60. def estimate_lambda(waiting_times, T):
  61. """
  62. 通过等待时间估计 λ(δ)
  63. :param waiting_times: 等待时间的数组
  64. :param T: 时间窗口的大小
  65. :return: λ(δ) 的估计值
  66. """
  67. # 将 waiting_times 转换为 NumPy 数组
  68. waiting_times = np.array(waiting_times)
  69. sum_indicator = np.sum(waiting_times < T)
  70. sum_waiting_times = np.sum(waiting_times)
  71. lambda_hat = sum_indicator / sum_waiting_times
  72. return lambda_hat
  73. def process_depth_data(order_book_snapshots):
  74. # 数据预热,至少10条数据才能用于计算
  75. if len(order_book_snapshots) < 10:
  76. return
  77. global k_initial, A_initial, S0
  78. S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
  79. if S0 < 0:
  80. S0 = S_values[0]
  81. # 提取时间戳并计算时间间隔
  82. timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
  83. order_book_time_points = [(timestamp - timestamps[0]).total_seconds() for timestamp in timestamps]
  84. # 计算 ∫ φ(k, ξ) dξ
  85. integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
  86. # 计算 log(∫ φ(k, ξ) dξ)
  87. log_integral_phi_value = np.log(integral_phi_value)
  88. logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
  89. # 计算等待时间
  90. waiting_times = [order_book_time_points[i] - order_book_time_points[i - 1] for i in range(1, len(order_book_time_points))]
  91. T = 0.25 # 时间窗口的大小
  92. # 估计 λ(δ)
  93. lambda_hat = estimate_lambda(waiting_times, T)
  94. log_lambda_hat_value = np.log(lambda_hat)
  95. logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value))