data_processing.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import json
  2. import pandas as pd
  3. import queue
  4. import threading
  5. from collections import deque
  6. from scipy.integrate import trapz
  7. import numpy as np
  8. from logger_config import logger
  9. # 假设我们有一个数据流,订单簿和成交数据
  10. order_book_snapshots = deque(maxlen=100) # 存储过去1000个订单簿快照
  11. # 数据积累的阈值
  12. DATA_THRESHOLD = 20
  13. messages = queue.Queue() # 创建一个线程安全队列
  14. stop_event = threading.Event()
  15. # 初始参数
  16. k_initial = 0.5
  17. A_initial = 1.0
  18. # 假设S0是初始的参考价格
  19. S0 = -1
  20. sigma = 0.2 # 假设一个波动率
  21. def on_message_depth(_ws, message):
  22. global order_book_snapshots
  23. json_message = json.loads(message)
  24. bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
  25. asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
  26. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  27. order_book_snapshots.append({
  28. 'bids': bids,
  29. 'asks': asks,
  30. 'timestamp': timestamp
  31. })
  32. if len(order_book_snapshots) >= DATA_THRESHOLD:
  33. process_depth_data(order_book_snapshots)
  34. def calculate_phi(prices, k, S0):
  35. """
  36. 计算 φ(k, ξ) 的值
  37. :param prices: 时间序列的价格数据
  38. :param k: 参数 k
  39. :param S0: 初始价格
  40. :return: φ(k, ξ) 的值
  41. """
  42. price_changes = np.array(prices) - S0
  43. exponentials = np.exp(k * price_changes)
  44. phi = np.mean(exponentials)
  45. return phi
  46. def calculate_integral_phi(prices, k, S0, time_points):
  47. """
  48. 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
  49. :param prices: 时间序列的价格数据
  50. :param k: 参数 k
  51. :param S0: 初始价格
  52. :param time_points: 时间点数组
  53. :return: ∫ φ(k, ξ) dξ 的值
  54. """
  55. # 计算每个时间点的 φ(k, ξ) 值
  56. phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
  57. # 使用梯形法计算积分
  58. integral_phi = trapz(phi_values, time_points)
  59. return integral_phi
  60. def process_depth_data(order_book_snapshots):
  61. global k_initial, A_initial, S0
  62. S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
  63. if S0 < 0:
  64. S0 = S_values[0]
  65. # 提取时间戳并计算时间间隔
  66. timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
  67. time_points = [(timestamp - timestamps[0]).total_seconds() for timestamp in timestamps]
  68. # 计算 ∫ φ(k, ξ) dξ
  69. integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, time_points)
  70. # 计算 log(∫ φ(k, ξ) dξ)
  71. log_integral_phi_value = np.log(integral_phi_value)
  72. logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))