import json import pandas as pd import queue import threading from collections import deque from scipy.integrate import trapz import numpy as np from logger_config import logger # 假设我们有一个数据流,订单簿和成交数据 order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照 trade_snapshots = deque(maxlen=6000) # 存储过去6000个成交数据 stop_event = threading.Event() # 初始参数 k_initial = 0.5 A_initial = 1.0 # 假设S0是初始的参考价格 S0 = -1 def on_message_trade(_ws, message): global trade_snapshots json_message = json.loads(message) trade = { 'price': float(json_message['data']['p']), 'qty': float(json_message['data']['q']), 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'), 'side': 'sell' if json_message['data']['m'] else 'buy' } trade_snapshots.append(trade) process_depth_data() def on_message_depth(_ws, message): global order_book_snapshots json_message = json.loads(message) bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]] asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]] timestamp = pd.to_datetime(json_message['data']['E'], unit='ms') order_book_snapshots.append({ 'bids': bids, 'asks': asks, 'timestamp': timestamp }) process_depth_data() def calculate_phi(prices, k, S0): """ 计算 φ(k, ξ) 的值 :param prices: 时间序列的价格数据 :param k: 参数 k :param S0: 初始价格 :return: φ(k, ξ) 的值 """ price_changes = np.array(prices) - S0 exponentials = np.exp(k * price_changes) phi = np.mean(exponentials) return phi def calculate_integral_phi(prices, k, S0, time_points): """ 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。 :param prices: 时间序列的价格数据 :param k: 参数 k :param S0: 初始价格 :param time_points: 时间点数组 :return: ∫ φ(k, ξ) dξ 的值 """ # 计算每个时间点的 φ(k, ξ) 值 phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))] # 使用梯形法计算积分 integral_phi = trapz(phi_values, time_points) return integral_phi def estimate_lambda(waiting_times, T): """ 通过等待时间估计 λ(δ) :param waiting_times: 等待时间的数组 :param T: 时间窗口的大小 :return: λ(δ) 的估计值 """ # 将 waiting_times 转换为 NumPy 数组 waiting_times = np.array(waiting_times) sum_indicator = np.sum(waiting_times < T) sum_waiting_times = int(np.sum(waiting_times).total_seconds() * 1000) lambda_hat = sum_indicator / sum_waiting_times return lambda_hat def process_depth_data(): global order_book_snapshots, trade_snapshots # 数据预热,至少10条深度数据以及100条成交数据才能用于计算 if len(order_book_snapshots) < 10 and len(trade_snapshots) < 100: return global k_initial, A_initial, S0 S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots] if S0 < 0: S0 = S_values[0] # ========================= 计算 log(∫ φ(k, ξ) dξ) ================== # 提取时间戳并计算时间间隔 order_book_timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots] order_book_time_points = [(timestamp - order_book_timestamps[0]).total_seconds() for timestamp in order_book_timestamps] # 计算 ∫ φ(k, ξ) dξ integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points) # 计算 log(∫ φ(k, ξ) dξ) log_integral_phi_value = np.log(integral_phi_value) # ========================== 估计 λ(δ) ============================== # 计算等待时间 trade_timestamps = [snapshot['timestamp'] for snapshot in trade_snapshots] waiting_times = [trade_timestamps[i] - trade_timestamps[i - 1] for i in range(1, len(trade_timestamps))] # 时间窗口的大小 T = pd.to_datetime(100, unit='ms') - pd.to_datetime(0, unit='ms') lambda_hat = estimate_lambda(waiting_times, T) # logger.info("λ(δ) 的值: " + str(lambda_hat) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value)) log_lambda_hat_value = np.log(lambda_hat) logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))