| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import json
- import pandas as pd
- import queue
- import threading
- from collections import deque
- from scipy.integrate import trapz
- import numpy as np
- from logger_config import logger
- # 假设我们有一个数据流,订单簿和成交数据
- order_book_snapshots = deque(maxlen=100) # 存储过去1000个订单簿快照
- # 数据积累的阈值
- DATA_THRESHOLD = 20
- messages = queue.Queue() # 创建一个线程安全队列
- stop_event = threading.Event()
- # 初始参数
- k_initial = 0.5
- A_initial = 1.0
- # 假设S0是初始的参考价格
- S0 = -1
- sigma = 0.2 # 假设一个波动率
- def on_message_depth(_ws, message):
- global order_book_snapshots
- json_message = json.loads(message)
- bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
- asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
- timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
- order_book_snapshots.append({
- 'bids': bids,
- 'asks': asks,
- 'timestamp': timestamp
- })
- if len(order_book_snapshots) >= DATA_THRESHOLD:
- process_depth_data(order_book_snapshots)
- def calculate_phi(prices, k, S0):
- """
- 计算 φ(k, ξ) 的值
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :return: φ(k, ξ) 的值
- """
- price_changes = np.array(prices) - S0
- exponentials = np.exp(k * price_changes)
- phi = np.mean(exponentials)
- return phi
- def calculate_integral_phi(prices, k, S0, delta_T):
- """
- 计算 ∫ φ(k, ξ) dξ 的值
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :param delta_T: 积分的上限
- :return: ∫ φ(k, ξ) dξ 的值
- """
- # 计算每个时间点的 φ(k, ξ) 值
- phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
- # 创建时间点数组
- time_points = np.linspace(0, delta_T, len(phi_values))
- # 使用梯形法计算积分
- integral_phi = trapz(phi_values, time_points)
- return integral_phi
- def process_depth_data(order_book_snapshots):
- global k_initial, A_initial, S0
- S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
- if S0 < 0:
- S0 = S_values[0]
- delta_T = len(S_values) - 1 # 假设每个时间点间隔为1
- # 计算 ∫ φ(k, ξ) dξ
- integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, delta_T)
- # 计算 log(∫ φ(k, ξ) dξ)
- log_integral_phi_value = np.log(integral_phi_value)
- logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
|