binance_gp_demo.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import threading
  2. import websocket
  3. import json
  4. import pandas as pd
  5. import numpy as np
  6. from scipy.optimize import minimize
  7. import time
  8. import warnings
  9. import logging
  10. import colorlog
  11. # 忽略 FutureWarning
  12. warnings.simplefilter(action='ignore', category=FutureWarning)
  13. # 配置日志
  14. handler = colorlog.StreamHandler()
  15. handler.setFormatter(colorlog.ColoredFormatter(
  16. "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
  17. datefmt=None,
  18. reset=True,
  19. log_colors={
  20. 'DEBUG': 'cyan',
  21. 'INFO': 'blue',
  22. 'WARNING': 'yellow',
  23. 'ERROR': 'red',
  24. 'CRITICAL': 'bold_red',
  25. }
  26. ))
  27. logger = logging.getLogger("binance_gp_demo")
  28. logger.setLevel(logging.INFO)
  29. logger.addHandler(handler)
  30. # 步骤二:订阅Binance的成交数据和订单簿数据
  31. # Binance WebSocket API URL
  32. SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
  33. SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
  34. # Initialize the DataFrame
  35. df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
  36. df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
  37. def on_message_trade(_ws, message):
  38. global df_trades
  39. json_message = json.loads(message)
  40. trade = {
  41. 'price': float(json_message['p']),
  42. 'qty': float(json_message['q']),
  43. 'timestamp': pd.to_datetime(json_message['T'], unit='ms')
  44. }
  45. trade_df = pd.DataFrame([trade])
  46. if not trade_df.empty and not trade_df.isna().all().all():
  47. df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
  48. # Function to handle order book messages
  49. def on_message_depth(_ws, message):
  50. global df_order_book
  51. json_message = json.loads(message)
  52. bids = json_message['bids'][:10] # Top 10 bids
  53. asks = json_message['asks'][:10] # Top 10 asks
  54. order_book = {
  55. 'bid_price': [float(bid[0]) for bid in bids],
  56. 'bid_qty': [float(bid[1]) for bid in bids],
  57. 'ask_price': [float(ask[0]) for ask in asks],
  58. 'ask_qty': [float(ask[1]) for ask in asks]
  59. }
  60. df_order_book = pd.DataFrame([order_book])
  61. def on_error(_ws, error):
  62. logger.error(error)
  63. def on_open(_ws):
  64. print("### opened ###")
  65. # Create a WebSocket app
  66. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
  67. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
  68. # 定义要传递给 run_forever 的参数
  69. http_proxy_host = "127.0.0.1"
  70. http_proxy_port = 7890
  71. proxy_type = "http"
  72. # Run the WebSocket with proxy settings
  73. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  74. 'http_proxy_host': http_proxy_host,
  75. 'http_proxy_port': http_proxy_port,
  76. 'proxy_type': proxy_type
  77. })
  78. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  79. 'http_proxy_host': http_proxy_host,
  80. 'http_proxy_port': http_proxy_port,
  81. 'proxy_type': proxy_type
  82. })
  83. trade_thread.start()
  84. depth_thread.start()
  85. # 步骤三:价差随机过程建模
  86. # Define the states and transition matrix
  87. states = ['tight', 'normal', 'wide']
  88. transition_matrix = np.zeros((3, 3))
  89. # 定义函数来更新转移矩阵
  90. def update_transition_matrix(df):
  91. global transition_matrix
  92. for i in range(len(df) - 1):
  93. current_state = df['state'].iloc[i]
  94. next_state = df['state'].iloc[i + 1]
  95. transition_matrix[states.index(current_state), states.index(next_state)] += 1
  96. row_sums = transition_matrix.sum(axis=1, keepdims=True)
  97. row_sums[row_sums == 0] = 1
  98. transition_matrix = transition_matrix / row_sums
  99. # 定义函数来分类价差状态
  100. def classify_spread(spread):
  101. if spread < 0.01:
  102. return 'tight'
  103. elif spread < 0.02:
  104. return 'normal'
  105. else:
  106. return 'wide'
  107. # 定义函数来计算价差并进行分类
  108. def calculate_and_classify_spread():
  109. global df_trades
  110. df_trades['spread'] = df_trades['price'].diff().abs()
  111. df_trades['state'] = df_trades['spread'].apply(classify_spread)
  112. # 定义周期性更新转移矩阵的函数
  113. stop_event = threading.Event()
  114. def update_transition_matrix_periodically():
  115. while not stop_event.is_set():
  116. calculate_and_classify_spread()
  117. update_transition_matrix(df_trades)
  118. current_state = df_trades['state'].iloc[-1] if not df_trades.empty else 'unknown'
  119. logger.info("Current State: %s\nTransition Matrix:\n%s\n", current_state, transition_matrix)
  120. stop_event.wait(5) # 每5秒更新一次
  121. # 创建并启动线程
  122. transition_matrix_update_thread = threading.Thread(target=update_transition_matrix_periodically)
  123. transition_matrix_update_thread.start()