| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import json
- import pandas as pd
- import queue
- import threading
- from collections import deque
- from scipy.integrate import trapz
- import numpy as np
- from logger_config import logger
- # 假设我们有一个数据流,订单簿和成交数据
- order_book_snapshots = deque(maxlen=100) # 存储过去1000个订单簿快照
- # 数据积累的阈值
- DATA_THRESHOLD = 20
- messages = queue.Queue() # 创建一个线程安全队列
- stop_event = threading.Event()
- # 初始参数
- k_initial = 0.5
- A_initial = 1.0
- # 假设S0是初始的参考价格
- S0 = -1
- sigma = 0.2 # 假设一个波动率
- def on_message_depth(_ws, message):
- global order_book_snapshots
- json_message = json.loads(message)
- bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
- asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
- timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
- order_book_snapshots.append({
- 'bids': bids,
- 'asks': asks,
- 'timestamp': timestamp
- })
- if len(order_book_snapshots) >= DATA_THRESHOLD:
- process_depth_data(order_book_snapshots)
- def calculate_phi(prices, k, S0):
- """
- 计算 φ(k, ξ) 的值
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :return: φ(k, ξ) 的值
- """
- price_changes = np.array(prices) - S0
- exponentials = np.exp(k * price_changes)
- phi = np.mean(exponentials)
- return phi
- def calculate_integral_phi(prices, k, S0, time_points):
- """
- 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :param time_points: 时间点数组
- :return: ∫ φ(k, ξ) dξ 的值
- """
- # 计算每个时间点的 φ(k, ξ) 值
- phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
- # 使用梯形法计算积分
- integral_phi = trapz(phi_values, time_points)
- return integral_phi
- def process_depth_data(order_book_snapshots):
- global k_initial, A_initial, S0
- S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
- if S0 < 0:
- S0 = S_values[0]
- # 提取时间戳并计算时间间隔
- timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
- time_points = [(timestamp - timestamps[0]).total_seconds() for timestamp in timestamps]
- # 计算 ∫ φ(k, ξ) dξ
- integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, time_points)
- # 计算 log(∫ φ(k, ξ) dξ)
- log_integral_phi_value = np.log(integral_phi_value)
- logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
|