import json import pandas as pd import queue import threading from collections import deque from scipy.optimize import minimize from logger_config import logger import numpy as np # 假设我们有一个数据流,订单簿和成交数据 order_book_snapshots = deque(maxlen=1000) # 存储过去10000个订单簿快照 # 数据积累的阈值 DATA_THRESHOLD = 20 messages = queue.Queue() # 创建一个线程安全队列 stop_event = threading.Event() # 初始参数 k_initial = 0.5 A_initial = 1.0 # 假设S0是初始的参考价格 S0 = -1 sigma = 0.2 # 假设一个波动率 def on_message_depth(_ws, message): global order_book_snapshots json_message = json.loads(message) bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]] asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]] timestamp = pd.to_datetime(json_message['data']['E'], unit='ms') order_book_snapshots.append({ 'bids': bids, 'asks': asks, 'timestamp': timestamp }) if len(order_book_snapshots) >= DATA_THRESHOLD: process_depth_data(order_book_snapshots) def phi(k, S, S0): return np.exp(k * (S - S0)) def target_function(params, S_values): A, k = params r = 0 for S in S_values: term1 = np.log(phi(k, S, S0)) r += (term1 - np.log(A)) ** 2 return r def process_depth_data(order_book_snapshots): global k_initial, A_initial, S0 S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots] # 使用买价计算 if S0 == 0: S0 = S_values[0] # 使用优化算法最小化目标函数,找到最优的A和k params = np.array([A_initial, k_initial]) result = minimize(target_function, params, args=(S_values,)) A_opt, k_opt = result.x logger.info("最优参数 A:" + str(A_opt) + " k:" + str(k_opt)) # 更新初始参数 A_initial = A_opt k_initial = k_opt