| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import json
- import pandas as pd
- import queue
- import threading
- from collections import deque
- from scipy.integrate import trapz
- import numpy as np
- from logger_config import logger
- # 假设我们有一个数据流,订单簿和成交数据
- order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照
- # 数据积累的阈值
- DATA_THRESHOLD = 20
- messages = queue.Queue() # 创建一个线程安全队列
- stop_event = threading.Event()
- # 初始参数
- k_initial = 0.5
- A_initial = 1.0
- # 假设S0是初始的参考价格
- S0 = -1
- sigma = 0.2 # 假设一个波动率
- def on_message_depth(_ws, message):
- global order_book_snapshots
- json_message = json.loads(message)
- bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
- asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
- timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
- order_book_snapshots.append({
- 'bids': bids,
- 'asks': asks,
- 'timestamp': timestamp
- })
- if len(order_book_snapshots) >= DATA_THRESHOLD:
- process_depth_data(order_book_snapshots)
- def calculate_phi(prices, k, S0):
- """
- 计算 φ(k, ξ) 的值
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :return: φ(k, ξ) 的值
- """
- price_changes = np.array(prices) - S0
- exponentials = np.exp(k * price_changes)
- phi = np.mean(exponentials)
- return phi
- def calculate_integral_phi(prices, k, S0, time_points):
- """
- 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :param time_points: 时间点数组
- :return: ∫ φ(k, ξ) dξ 的值
- """
- # 计算每个时间点的 φ(k, ξ) 值
- phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
- # 使用梯形法计算积分
- integral_phi = trapz(phi_values, time_points)
- return integral_phi
- def estimate_lambda(waiting_times, T):
- """
- 通过等待时间估计 λ(δ)
- :param waiting_times: 等待时间的数组
- :param T: 时间窗口的大小
- :return: λ(δ) 的估计值
- """
- # 将 waiting_times 转换为 NumPy 数组
- waiting_times = np.array(waiting_times)
- sum_indicator = np.sum(waiting_times < T)
- sum_waiting_times = np.sum(waiting_times)
- lambda_hat = sum_indicator / sum_waiting_times
- return lambda_hat
- def process_depth_data(order_book_snapshots):
- # 数据预热,至少10条数据才能用于计算
- if len(order_book_snapshots) < 10:
- return
- global k_initial, A_initial, S0
- S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
- if S0 < 0:
- S0 = S_values[0]
- # 提取时间戳并计算时间间隔
- timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
- order_book_time_points = [(timestamp - timestamps[0]).total_seconds() for timestamp in timestamps]
- # 计算 ∫ φ(k, ξ) dξ
- integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
- # 计算 log(∫ φ(k, ξ) dξ)
- log_integral_phi_value = np.log(integral_phi_value)
- logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
- # 计算等待时间
- waiting_times = [order_book_time_points[i] - order_book_time_points[i - 1] for i in range(1, len(order_book_time_points))]
- T = 0.25 # 时间窗口的大小
- # 估计 λ(δ)
- lambda_hat = estimate_lambda(waiting_times, T)
- log_lambda_hat_value = np.log(lambda_hat)
- logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value))
|