| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import json
- import pandas as pd
- import queue
- import threading
- from collections import deque
- from scipy.integrate import trapz
- import numpy as np
- from logger_config import logger
- # 假设我们有一个数据流,订单簿和成交数据
- order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照
- trade_snapshots = deque(maxlen=6000) # 存储过去6000个成交数据
- stop_event = threading.Event()
- # 初始参数
- k_initial = 0.5
- A_initial = 1.0
- # 假设S0是初始的参考价格
- S0 = -1
- def on_message_trade(_ws, message):
- global trade_snapshots
- json_message = json.loads(message)
- trade = {
- 'price': float(json_message['data']['p']),
- 'qty': float(json_message['data']['q']),
- 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
- 'side': 'sell' if json_message['data']['m'] else 'buy'
- }
- trade_snapshots.append(trade)
- process_depth_data()
- def on_message_depth(_ws, message):
- global order_book_snapshots
- json_message = json.loads(message)
- bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
- asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
- timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
- order_book_snapshots.append({
- 'bids': bids,
- 'asks': asks,
- 'timestamp': timestamp
- })
- process_depth_data()
- def calculate_phi(prices, k, S0):
- """
- 计算 φ(k, ξ) 的值
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :return: φ(k, ξ) 的值
- """
- price_changes = np.array(prices) - S0
- exponentials = np.exp(k * price_changes)
- phi = np.mean(exponentials)
- return phi
- def calculate_integral_phi(prices, k, S0, time_points):
- """
- 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :param time_points: 时间点数组
- :return: ∫ φ(k, ξ) dξ 的值
- """
- # 计算每个时间点的 φ(k, ξ) 值
- phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
- # 使用梯形法计算积分
- integral_phi = trapz(phi_values, time_points)
- return integral_phi
- def estimate_lambda(waiting_times, T):
- """
- 通过等待时间估计 λ(δ)
- :param waiting_times: 等待时间的数组
- :param T: 时间窗口的大小
- :return: λ(δ) 的估计值
- """
- # 将 waiting_times 转换为 NumPy 数组
- waiting_times = np.array(waiting_times)
- sum_indicator = np.sum(waiting_times < T)
- sum_waiting_times = int(np.sum(waiting_times).total_seconds() * 1000)
- lambda_hat = sum_indicator / sum_waiting_times
- return lambda_hat
- def process_depth_data():
- global order_book_snapshots, trade_snapshots
- # 数据预热,至少10条深度数据以及100条成交数据才能用于计算
- if len(order_book_snapshots) < 10 and len(trade_snapshots) < 100:
- return
- global k_initial, A_initial, S0
- S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
- if S0 < 0:
- S0 = S_values[0]
- # ========================= 计算 log(∫ φ(k, ξ) dξ) ==================
- # 提取时间戳并计算时间间隔
- order_book_timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
- order_book_time_points = [(timestamp - order_book_timestamps[0]).total_seconds() for timestamp in order_book_timestamps]
- # 计算 ∫ φ(k, ξ) dξ
- integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
- # 计算 log(∫ φ(k, ξ) dξ)
- log_integral_phi_value = np.log(integral_phi_value)
- # ========================== 估计 λ(δ) ==============================
- # 计算等待时间
- trade_timestamps = [snapshot['timestamp'] for snapshot in trade_snapshots]
- waiting_times = [trade_timestamps[i] - trade_timestamps[i - 1] for i in range(1, len(trade_timestamps))]
- # 时间窗口的大小
- T = pd.to_datetime(100, unit='ms') - pd.to_datetime(0, unit='ms')
- lambda_hat = estimate_lambda(waiting_times, T)
- # logger.info("λ(δ) 的值: " + str(lambda_hat) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
- log_lambda_hat_value = np.log(lambda_hat)
- logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
|