data_processing.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import json
  2. import pandas as pd
  3. import queue
  4. import threading
  5. from collections import deque
  6. from scipy.integrate import trapz
  7. import numpy as np
  8. from logger_config import logger
  9. # 假设我们有一个数据流,订单簿和成交数据
  10. order_book_snapshots = deque(maxlen=100) # 存储过去1000个订单簿快照
  11. # 数据积累的阈值
  12. DATA_THRESHOLD = 20
  13. messages = queue.Queue() # 创建一个线程安全队列
  14. stop_event = threading.Event()
  15. # 初始参数
  16. k_initial = 0.5
  17. A_initial = 1.0
  18. # 假设S0是初始的参考价格
  19. S0 = -1
  20. sigma = 0.2 # 假设一个波动率
  21. def on_message_depth(_ws, message):
  22. global order_book_snapshots
  23. json_message = json.loads(message)
  24. bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
  25. asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
  26. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  27. order_book_snapshots.append({
  28. 'bids': bids,
  29. 'asks': asks,
  30. 'timestamp': timestamp
  31. })
  32. if len(order_book_snapshots) >= DATA_THRESHOLD:
  33. process_depth_data(order_book_snapshots)
  34. def calculate_phi(prices, k, S0):
  35. """
  36. 计算 φ(k, ξ) 的值
  37. :param prices: 时间序列的价格数据
  38. :param k: 参数 k
  39. :param S0: 初始价格
  40. :return: φ(k, ξ) 的值
  41. """
  42. price_changes = np.array(prices) - S0
  43. exponentials = np.exp(k * price_changes)
  44. phi = np.mean(exponentials)
  45. return phi
  46. def calculate_integral_phi(prices, k, S0, delta_T):
  47. """
  48. 计算 ∫ φ(k, ξ) dξ 的值
  49. :param prices: 时间序列的价格数据
  50. :param k: 参数 k
  51. :param S0: 初始价格
  52. :param delta_T: 积分的上限
  53. :return: ∫ φ(k, ξ) dξ 的值
  54. """
  55. # 计算每个时间点的 φ(k, ξ) 值
  56. phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
  57. # 创建时间点数组
  58. time_points = np.linspace(0, delta_T, len(phi_values))
  59. # 使用梯形法计算积分
  60. integral_phi = trapz(phi_values, time_points)
  61. return integral_phi
  62. def process_depth_data(order_book_snapshots):
  63. global k_initial, A_initial, S0
  64. S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
  65. if S0 < 0:
  66. S0 = S_values[0]
  67. delta_T = len(S_values) - 1 # 假设每个时间点间隔为1
  68. # 计算 ∫ φ(k, ξ) dξ
  69. integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, delta_T)
  70. # 计算 log(∫ φ(k, ξ) dξ)
  71. log_integral_phi_value = np.log(integral_phi_value)
  72. logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))