data_processing.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import json
  2. import pandas as pd
  3. import queue
  4. import threading
  5. from collections import deque
  6. from scipy.optimize import minimize
  7. from logger_config import logger
  8. import numpy as np
  9. # 假设我们有一个数据流,订单簿和成交数据
  10. order_book_snapshots = deque(maxlen=1000) # 存储过去10000个订单簿快照
  11. # 数据积累的阈值
  12. DATA_THRESHOLD = 20
  13. messages = queue.Queue() # 创建一个线程安全队列
  14. stop_event = threading.Event()
  15. # 初始参数
  16. k_initial = 0.5
  17. A_initial = 1.0
  18. # 假设S0是初始的参考价格
  19. S0 = -1
  20. sigma = 0.2 # 假设一个波动率
  21. def on_message_depth(_ws, message):
  22. global order_book_snapshots
  23. json_message = json.loads(message)
  24. bids = [[float(price), float(quantity)] for price, quantity in json_message['data']['b'][:10]]
  25. asks = [[float(price), float(quantity)] for price, quantity in json_message['data']['a'][:10]]
  26. timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
  27. order_book_snapshots.append({
  28. 'bids': bids,
  29. 'asks': asks,
  30. 'timestamp': timestamp
  31. })
  32. if len(order_book_snapshots) >= DATA_THRESHOLD:
  33. process_depth_data(order_book_snapshots)
  34. def phi(k, S, S0):
  35. return np.exp(k * (S - S0))
  36. def target_function(params, S_values):
  37. A, k = params
  38. r = 0
  39. for S in S_values:
  40. term1 = np.log(phi(k, S, S0))
  41. r += (term1 - np.log(A)) ** 2
  42. return r
  43. def process_depth_data(order_book_snapshots):
  44. global k_initial, A_initial, S0
  45. S_values = [((snapshot['bids'][0][0] + snapshot['asks'][0][0]) / 2) for snapshot in order_book_snapshots] # 使用买价计算
  46. if S0 == 0:
  47. S0 = S_values[0]
  48. # 使用优化算法最小化目标函数,找到最优的A和k
  49. params = np.array([A_initial, k_initial])
  50. result = minimize(target_function, params, args=(S_values,))
  51. A_opt, k_opt = result.x
  52. logger.info("最优参数 A:" + str(A_opt) + " k:" + str(k_opt))
  53. # 更新初始参数
  54. A_initial = A_opt
  55. k_initial = k_opt