import json import pandas as pd import queue import threading from collections import deque from scipy.integrate import trapz import numpy as np from logger_config import logger # 假设我们有一个数据流,订单簿和成交数据 order_book_snapshots = deque(maxlen=100) # 存储过去1000个订单簿快照 # 数据积累的阈值 DATA_THRESHOLD = 20 messages = queue.Queue() # 创建一个线程安全队列 stop_event = threading.Event() # 初始参数 k_initial = 0.5 A_initial = 1.0 # 假设S0是初始的参考价格 S0 = -1 sigma = 0.2 # 假设一个波动率 def on_message_depth(_ws, message): global order_book_snapshots json_message = json.loads(message) bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]] asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]] timestamp = pd.to_datetime(json_message['data']['E'], unit='ms') order_book_snapshots.append({ 'bids': bids, 'asks': asks, 'timestamp': timestamp }) if len(order_book_snapshots) >= DATA_THRESHOLD: process_depth_data(order_book_snapshots) def calculate_phi(prices, k, S0): """ 计算 φ(k, ξ) 的值 :param prices: 时间序列的价格数据 :param k: 参数 k :param S0: 初始价格 :return: φ(k, ξ) 的值 """ price_changes = np.array(prices) - S0 exponentials = np.exp(k * price_changes) phi = np.mean(exponentials) return phi def calculate_integral_phi(prices, k, S0, time_points): """ 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。 :param prices: 时间序列的价格数据 :param k: 参数 k :param S0: 初始价格 :param time_points: 时间点数组 :return: ∫ φ(k, ξ) dξ 的值 """ # 计算每个时间点的 φ(k, ξ) 值 phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))] # 使用梯形法计算积分 integral_phi = trapz(phi_values, time_points) return integral_phi def process_depth_data(order_book_snapshots): global k_initial, A_initial, S0 S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots] if S0 < 0: S0 = S_values[0] # 提取时间戳并计算时间间隔 timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots] time_points = [(timestamp - timestamps[0]).total_seconds() for timestamp in timestamps] # 计算 ∫ φ(k, ξ) dξ integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, time_points) # 计算 log(∫ φ(k, ξ) dξ) log_integral_phi_value = np.log(integral_phi_value) logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))