| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- import json
- from decimal import Decimal, getcontext
- import pandas as pd
- import threading
- from collections import deque
- from scipy.integrate import trapz
- import numpy as np
- from scipy.optimize import minimize
- from logger_config import logger
- # 设置全局精度
- getcontext().prec = 28
- # 假设我们有一个数据流,订单簿和成交数据
- order_book_snapshots = deque(maxlen=600) # 存储过去600个订单簿快照
- spread_delta_snapshots = deque(maxlen=6000) # 存储过去600个价差数据(最小变动价格的倍数)
- trade_snapshots = deque(maxlen=6000) # 存储过去6000个成交数据
- stop_event = threading.Event()
- # 初始参数
- k_initial = 10
- A_initial = 100
- # 定义参数范围
- bounds = [(10, 1000.0), # A 的范围
- (0.01, 100.0)] # k 的范围
- # 假设S0是初始的参考价格
- S0 = -1
- def get_tick_size_from_prices(ask_price, bid_price):
- # 获取价格的小数位数
- ask_decimal_places = len(str(ask_price).split('.')[1])
- bid_decimal_places = len(str(bid_price).split('.')[1])
- # 确定最小变动单元
- tick_size = 10 ** -max(ask_decimal_places, bid_decimal_places)
- return tick_size
- def on_message_trade(_ws, message):
- global trade_snapshots
- json_message = json.loads(message)
- trade = {
- 'price': float(json_message['data']['p']),
- 'qty': float(json_message['data']['q']),
- 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
- 'side': 'sell' if json_message['data']['m'] else 'buy'
- }
- trade_snapshots.append(trade)
- process_depth_data()
- def on_message_depth(_ws, message):
- global order_book_snapshots, spread_delta_snapshots
- json_message = json.loads(message)
- bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
- asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
- timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
- depth = {
- 'bids': bids,
- 'asks': asks,
- 'timestamp': timestamp
- }
- order_book_snapshots.append(depth)
- # 求价差
- ask_price = Decimal(str(asks[0][0]))
- bid_price = Decimal(str(bids[0][0]))
- tick_size = get_tick_size_from_prices(ask_price, bid_price)
- spread = float(ask_price - bid_price)
- spread_delta = int(spread / tick_size)
- spread_delta_snapshots.append(spread_delta)
- process_depth_data()
- def calculate_phi(prices, k, S0):
- """
- 计算 φ(k, ξ) 的值
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :return: φ(k, ξ) 的值
- """
- price_changes = np.array(prices) - S0
- exponentials = np.exp(k * price_changes)
- phi = np.mean(exponentials)
- return phi
- def calculate_integral_phi(prices, k, S0, time_points):
- """
- 计算 ∫ φ(k, ξ) dξ 的值,越大说明波动率也越大(价格波动)。
- :param prices: 时间序列的价格数据
- :param k: 参数 k
- :param S0: 初始价格
- :param time_points: 时间点数组
- :return: ∫ φ(k, ξ) dξ 的值
- """
- # 计算每个时间点的 φ(k, ξ) 值
- phi_values = [calculate_phi(prices[:i + 1], k, S0) for i in range(len(prices))]
- # 使用梯形法计算积分
- integral_phi = trapz(phi_values, time_points)
- return integral_phi
- def estimate_lambda(waiting_times, T):
- """
- 通过等待时间估计 λ(δ)
- :param waiting_times: 等待时间的数组
- :param T: 时间窗口的大小
- :return: λ(δ) 的估计值
- """
- # 将 waiting_times 转换为 NumPy 数组
- waiting_times = np.array(waiting_times)
- sum_indicator = np.sum(waiting_times < T)
- sum_waiting_times = int(np.sum(waiting_times).total_seconds() * 1000)
- lambda_hat = sum_indicator / sum_waiting_times
- return lambda_hat
- def objective_function(params, delta_max, log_lambda_hat_value, log_integral_phi_value):
- """
- 目标函数 r(A, k)
- :param params: 包含 A 和 k 的数组
- :param delta_max: 最大的价格偏移
- :param log_lambda_hat_value: log(λ(δ)) 的值
- :param log_integral_phi_value: log(∫ φ(k, ξ) dξ) 的值
- :return: 目标函数值
- """
- A, k = params
- residuals = []
- for delta in range(1, delta_max + 1):
- rst = (log_lambda_hat_value + k * delta - np.log(A) - log_integral_phi_value)
- residual = rst ** 2
- residuals.append(residual)
- return np.sum(residuals)
- def process_depth_data():
- global order_book_snapshots, trade_snapshots, spread_delta_snapshots
- global k_initial, A_initial, S0
- # 数据预热,至少10条深度数据以及100条成交数据才能用于计算
- if len(order_book_snapshots) < 10 or len(trade_snapshots) < 100:
- return
- S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots]
- if S0 < 0:
- S0 = S_values[0]
- # ========================= 计算 log(∫ φ(k, ξ) dξ) ==================
- # 提取时间戳并计算时间间隔
- order_book_timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
- order_book_time_points = [(timestamp - order_book_timestamps[0]).total_seconds() for timestamp in order_book_timestamps]
- # 计算 ∫ φ(k, ξ) dξ
- integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
- # 计算 log(∫ φ(k, ξ) dξ)
- log_integral_phi_value = np.log(integral_phi_value)
- # ========================== 估计 λ(δ) ==============================
- # 计算等待时间
- trade_timestamps = [snapshot['timestamp'] for snapshot in trade_snapshots]
- waiting_times = [trade_timestamps[i] - trade_timestamps[i - 1] for i in range(1, len(trade_timestamps))]
- # 时间窗口的大小
- T = pd.to_datetime(100, unit='ms') - pd.to_datetime(0, unit='ms')
- # 计算 λ(δ) 的估计值
- lambda_hat = estimate_lambda(waiting_times, T)
- # 计算 log(λ(δ))
- log_lambda_hat_value = np.log(lambda_hat)
- # ========================== 校准 A 和 k =============================
- delta_max = np.max(spread_delta_snapshots)
- # 优化目标函数以找到最优的 A 和 k
- result = minimize(objective_function, np.array([A_initial, k_initial]),
- args=(delta_max, log_lambda_hat_value, log_integral_phi_value),
- bounds=bounds)
- if result.success:
- A_initial, k_initial = result.x
- logger.info(f"Optimal A: {A_initial}, Optimal k: {k_initial}")
- else:
- logger.error("Optimization failed")
- # logger.info("log(λ(δ)): {}, log(∫ φ(k, ξ) dξ): {}".format(log_lambda_hat_value, log_integral_phi_value))
|