data_processing.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import json
  2. from decimal import Decimal, getcontext
  3. import pandas as pd
  4. import threading
  5. from collections import deque
  6. from scipy.integrate import trapz
  7. import numpy as np
  8. from scipy.optimize import minimize
  9. from logger_config import logger
  10. # 设置全局精度
  11. getcontext().prec = 28
  12. # 假设我们有一个数据流,订单簿和成交数据
  13. order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照
  14. spread_delta_snapshots = deque(maxlen=6000) # 存储过去600个价差数据(最小变动价格的倍数)
  15. trade_snapshots = deque(maxlen=6000) # 存储过去6000个成交数据
  16. stop_event = threading.Event()
  17. # 初始参数
  18. k_initial = 6
  19. A_initial = 140
  20. # 定义参数范围
  21. bounds = [(10, 1000.0), # A 的范围
  22. (0.01, 100.0)] # k 的范围
  23. # 假设S0是初始的参考价格
  24. S0 = -1
  25. def on_message(_ws, message):
  26. json_message = json.loads(message)
  27. if json_message["channel"] == "futures.order_book" and json_message["event"] == "all":
  28. on_message_depth(json_message)
  29. elif json_message["channel"] == "futures.trades" and json_message["event"] == "update":
  30. on_message_trade(json_message)
  31. def get_tick_size_from_prices(ask_price, bid_price):
  32. # 获取价格的小数位数
  33. ask_decimal_places = len(str(ask_price).split('.')[1])
  34. bid_decimal_places = len(str(bid_price).split('.')[1])
  35. # 确定最小变动单元
  36. tick_size = 10 ** -max(ask_decimal_places, bid_decimal_places)
  37. return tick_size
  38. def on_message_trade(json_message):
  39. global trade_snapshots
  40. # 解析 Gate.io 的成交信息
  41. for trade_origin in json_message['result']:
  42. price = trade_origin['price']
  43. size = trade_origin['size']
  44. timestamp = pd.to_datetime(trade_origin['create_time_ms'], unit='ms')
  45. # 转换为需要的格式
  46. trade = {
  47. 'price': float(price),
  48. 'qty': float(size),
  49. 'timestamp': timestamp
  50. }
  51. # 买卖方向判断
  52. if trade["qty"] > 0:
  53. trade["side"] = "buy"
  54. else:
  55. trade["side"] = "sell"
  56. trade_snapshots.append(trade)
  57. process_depth_data()
  58. def on_message_depth(json_message):
  59. global order_book_snapshots, spread_delta_snapshots
  60. # 解析 Gate.io 的深度信息
  61. result = json_message['result']
  62. bids = result['bids']
  63. asks = result['asks']
  64. timestamp = pd.to_datetime(result['t'], unit='ms')
  65. # 转换为 Binance 兼容的格式
  66. asks_converted = [[float(ask['p']), float(ask['s'])] for ask in asks[:10]]
  67. bids_converted = [[float(bid['p']), float(bid['s'])] for bid in bids[:10]]
  68. depth = {
  69. 'bids': bids_converted,
  70. 'asks': asks_converted,
  71. 'timestamp': timestamp
  72. }
  73. order_book_snapshots.append(depth)
  74. # 求价差
  75. ask_price = Decimal(str(asks_converted[0][0]))
  76. bid_price = Decimal(str(bids_converted[0][0]))
  77. tick_size = get_tick_size_from_prices(ask_price, bid_price)
  78. spread = float(ask_price - bid_price)
  79. spread_delta = int(spread / tick_size)
  80. spread_delta_snapshots.append(spread_delta)
  81. process_depth_data()
  82. def calculate_phi(prices, k, S0):
  83. """
  84. 计算 φ(k, ξ) 的值
  85. :param prices: 时间序列的价格数据
  86. :param k: 参数 k
  87. :param S0: 初始价格
  88. :return: φ(k, ξ) 的值
  89. """
  90. price_changes = np.array(prices) - S0
  91. exponentials = np.exp(k * price_changes)
  92. phi = np.mean(exponentials)
  93. return phi
  94. def calculate_integral_phi(prices, k, S0, time_points):
  95. """
  96. 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
  97. :param prices: 时间序列的价格数据
  98. :param k: 参数 k
  99. :param S0: 初始价格
  100. :param time_points: 时间点数组
  101. :return: ∫ φ(k, ξ) dξ 的值
  102. """
  103. # 计算每个时间点的 φ(k, ξ) 值
  104. phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
  105. # 使用梯形法计算积分
  106. integral_phi = trapz(phi_values, time_points)
  107. return integral_phi
  108. def estimate_lambda(waiting_times, T):
  109. """
  110. 通过等待时间估计 λ(δ)
  111. :param waiting_times: 等待时间的数组
  112. :param T: 时间窗口的大小
  113. :return: λ(δ) 的估计值
  114. """
  115. # 将 waiting_times 转换为 NumPy 数组
  116. waiting_times = np.array(waiting_times)
  117. sum_indicator = np.sum(waiting_times < T)
  118. sum_waiting_times = int(np.sum(waiting_times).total_seconds() * 1000)
  119. lambda_hat = sum_indicator / sum_waiting_times
  120. return lambda_hat
  121. def objective_function(params, delta_max, log_lambda_hat_value, log_integral_phi_value):
  122. """
  123. 目标函数 r(A, k)
  124. :param params: 包含 A 和 k 的数组
  125. :param delta_max: 最大的价格偏移
  126. :param log_lambda_hat_value: log(λ(δ)) 的值
  127. :param log_integral_phi_value: log(∫ φ(k, ξ) dξ) 的值
  128. :return: 目标函数值
  129. """
  130. A, k = params
  131. residuals = []
  132. for delta in range(1, delta_max + 1):
  133. rst = (log_lambda_hat_value + k * delta - np.log(A) - log_integral_phi_value)
  134. residual = rst ** 2
  135. residuals.append(residual)
  136. return np.sum(residuals)
  137. def calculate_delta_sum(gamma, sigma_squared, T_minus_t, k):
  138. term1 = gamma * sigma_squared * T_minus_t
  139. term2 = (2 / gamma) * np.log(1 + (gamma / k))
  140. delta_sum = term1 + term2
  141. return delta_sum
  142. def calculate_sigma_squared(prices, timestamps):
  143. """
  144. 计算 σ^2 的值
  145. :param prices: 时间序列的价格数据
  146. :param timestamps: 时间戳数组
  147. :return: σ^2 的值
  148. """
  149. n = len(prices)
  150. if n < 2:
  151. return 0.0
  152. time_diff = int(int((timestamps[-1] - timestamps[0]).total_seconds() * 1000) / 100)
  153. price_diff_squared = [(prices[i] - prices[i - 1]) ** 2 for i in range(1, n)]
  154. sigma_squared = np.sum(price_diff_squared) / time_diff
  155. return sigma_squared
  156. def process_depth_data():
  157. global order_book_snapshots, trade_snapshots, spread_delta_snapshots
  158. global k_initial, A_initial, S0
  159. # 数据预热,至少3条深度数据以及10条成交数据才能用于计算
  160. if len(order_book_snapshots) < 3 or len(trade_snapshots) < 10:
  161. return
  162. S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
  163. if S0 < 0:
  164. S0 = S_values[0]
  165. # ========================= 计算 log(∫ φ(k, ξ) dξ) ==================
  166. # 提取时间戳并计算时间间隔
  167. order_book_timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
  168. order_book_time_points = [(timestamp - order_book_timestamps[0]).total_seconds() for timestamp in order_book_timestamps]
  169. # 计算 ∫ φ(k, ξ) dξ
  170. integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
  171. # 计算 log(∫ φ(k, ξ) dξ)
  172. log_integral_phi_value = np.log(integral_phi_value)
  173. # ========================== 估计 λ(δ) ==============================
  174. # 计算等待时间
  175. trade_timestamps = [snapshot['timestamp'] for snapshot in trade_snapshots]
  176. waiting_times = [trade_timestamps[i] - trade_timestamps[i - 1] for i in range(1, len(trade_timestamps))]
  177. # 时间窗口的大小
  178. T = pd.to_datetime(100, unit='ms') - pd.to_datetime(0, unit='ms')
  179. # 计算 λ(δ) 的估计值
  180. lambda_hat = estimate_lambda(waiting_times, T)
  181. # 计算 log(λ(δ))
  182. log_lambda_hat_value = np.log(lambda_hat)
  183. # ========================== 校准 A 和 k =============================
  184. delta_max = np.max(spread_delta_snapshots)
  185. # 优化目标函数以找到最优的 A 和 k
  186. result = minimize(objective_function, np.array([A_initial, k_initial]),
  187. args=(delta_max, log_lambda_hat_value, log_integral_phi_value),
  188. bounds=bounds)
  189. if result.success:
  190. A_initial, k_initial = result.x
  191. # logger.info(f"Optimal k: {k_initial}, delta_max: {delta_max}")
  192. else:
  193. logger.error(result)
  194. # ========================== 计算 σ^2 ==========================
  195. sigma_squared = calculate_sigma_squared(S_values, order_book_timestamps)
  196. # ========================== 计算 δ^a + δ^b ==========================
  197. gamma = 1.0
  198. T_minus_t = 1.0
  199. delta_sum = calculate_delta_sum(gamma, sigma_squared, T_minus_t, k_initial)
  200. logger.info(f"δ^a + δ^b: {delta_sum}")