| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import json
- import pandas as pd
- import queue
- import threading
- from collections import deque
- from scipy.optimize import minimize
- from logger_config import logger
- import numpy as np
- # 假设我们有一个数据流,订单簿和成交数据
- order_book_snapshots = deque(maxlen=1000) # 存储过去10000个订单簿快照
- # 数据积累的阈值
- DATA_THRESHOLD = 20
- messages = queue.Queue() # 创建一个线程安全队列
- stop_event = threading.Event()
- # 初始参数
- k_initial = 0.5
- A_initial = 1.0
- # 假设S0是初始的参考价格
- S0 = -1
- sigma = 0.2 # 假设一个波动率
- def on_message_depth(_ws, message):
- global order_book_snapshots
- json_message = json.loads(message)
- bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
- asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
- timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
- order_book_snapshots.append({
- 'bids': bids,
- 'asks': asks,
- 'timestamp': timestamp
- })
- if len(order_book_snapshots) >= DATA_THRESHOLD:
- process_depth_data(order_book_snapshots)
- def phi(k, S, S0):
- return np.exp(k * (S - S0))
- def target_function(params, S_values):
- A, k = params
- r = 0
- for S in S_values:
- term1 = np.log(phi(k, S, S0))
- r += (term1 - np.log(A)) ** 2
- return r
- def process_depth_data(order_book_snapshots):
- global k_initial, A_initial, S0
- S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots] # 使用买价计算
- if S0 == 0:
- S0 = S_values[0]
- # 使用优化算法最小化目标函数,找到最优的A和k
- params = np.array([A_initial, k_initial])
- result = minimize(target_function, params, args=(S_values,))
- A_opt, k_opt = result.x
- logger.info("最优参数 A:" + str(A_opt) + " k:" + str(k_opt))
- # 更新初始参数
- A_initial = A_opt
- k_initial = k_opt
|