import json import pandas as pd import queue import threading from collections import deque from scipy.integrate import trapz import numpy as np from logger_config import logger # 假设我们有一个数据流,订单簿和成交数据 order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照 # 数据积累的阈值 DATA_THRESHOLD = 20 messages = queue.Queue() # 创建一个线程安全队列 stop_event = threading.Event() # 初始参数 k_initial = 0.5 A_initial = 1.0 # 假设S0是初始的参考价格 S0 = -1 sigma = 0.2 # 假设一个波动率 def on_message_depth(_ws, message): global order_book_snapshots json_message = json.loads(message) bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]] asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]] timestamp = pd.to_datetime(json_message['data']['E'], unit='ms') order_book_snapshots.append({ 'bids': bids, 'asks': asks, 'timestamp': timestamp }) if len(order_book_snapshots) >= DATA_THRESHOLD: process_depth_data(order_book_snapshots) def calculate_phi(prices, k, S0): """ 计算 φ(k, ξ) 的值 :param prices: 时间序列的价格数据 :param k: 参数 k :param S0: 初始价格 :return: φ(k, ξ) 的值 """ price_changes = np.array(prices) - S0 exponentials = np.exp(k * price_changes) phi = np.mean(exponentials) return phi def calculate_integral_phi(prices, k, S0, time_points): """ 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。 :param prices: 时间序列的价格数据 :param k: 参数 k :param S0: 初始价格 :param time_points: 时间点数组 :return: ∫ φ(k, ξ) dξ 的值 """ # 计算每个时间点的 φ(k, ξ) 值 phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))] # 使用梯形法计算积分 integral_phi = trapz(phi_values, time_points) return integral_phi def estimate_lambda(waiting_times, T): """ 通过等待时间估计 λ(δ) :param waiting_times: 等待时间的数组 :param T: 时间窗口的大小 :return: λ(δ) 的估计值 """ # 将 waiting_times 转换为 NumPy 数组 waiting_times = np.array(waiting_times) sum_indicator = np.sum(waiting_times < T) sum_waiting_times = np.sum(waiting_times) lambda_hat = sum_indicator / sum_waiting_times return lambda_hat def process_depth_data(order_book_snapshots): # 数据预热,至少10条数据才能用于计算 if len(order_book_snapshots) < 10: return global k_initial, A_initial, S0 S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots] if S0 < 0: S0 = S_values[0] # 提取时间戳并计算时间间隔 timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots] order_book_time_points = [(timestamp - timestamps[0]).total_seconds() for timestamp in timestamps] # 计算 ∫ φ(k, ξ) dξ integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points) # 计算 log(∫ φ(k, ξ) dξ) log_integral_phi_value = np.log(integral_phi_value) logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value)) # 计算等待时间 waiting_times = [order_book_time_points[i] - order_book_time_points[i - 1] for i in range(1, len(order_book_time_points))] T = 0.25 # 时间窗口的大小 # 估计 λ(δ) lambda_hat = estimate_lambda(waiting_times, T) log_lambda_hat_value = np.log(lambda_hat) logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value))