binance_gp_demo.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import threading
  2. import websocket
  3. import json
  4. import pandas as pd
  5. import numpy as np
  6. from scipy.optimize import minimize
  7. import time
  8. import warnings
  9. import logging
  10. import colorlog
  11. # 忽略 FutureWarning
  12. warnings.simplefilter(action='ignore', category=FutureWarning)
  13. # 配置日志
  14. handler = colorlog.StreamHandler()
  15. handler.setFormatter(colorlog.ColoredFormatter(
  16. "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
  17. datefmt=None,
  18. reset=True,
  19. log_colors={
  20. 'DEBUG': 'cyan',
  21. 'INFO': 'blue',
  22. 'WARNING': 'yellow',
  23. 'ERROR': 'red',
  24. 'CRITICAL': 'bold_red',
  25. }
  26. ))
  27. logger = logging.getLogger("binance_gp_demo")
  28. logger.setLevel(logging.INFO)
  29. logger.addHandler(handler)
  30. # 步骤二:订阅Binance的成交数据和订单簿数据
  31. # Binance WebSocket API URL
  32. SOCKET_TRADE = "wss://stream.binance.com:9443/ws/notusdt@trade"
  33. SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/notusdt@depth20@100ms"
  34. # Initialize the DataFrame
  35. df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
  36. df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
  37. def on_message_trade(_ws, message):
  38. global df_trades
  39. json_message = json.loads(message)
  40. trade = {
  41. 'price': float(json_message['p']),
  42. 'qty': float(json_message['q']),
  43. 'timestamp': pd.to_datetime(json_message['T'], unit='ms')
  44. }
  45. trade_df = pd.DataFrame([trade])
  46. if not trade_df.empty and not trade_df.isna().all().all():
  47. df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
  48. # Function to handle order book messages
  49. def on_message_depth(_ws, message):
  50. global df_order_book
  51. json_message = json.loads(message)
  52. bids = json_message['bids'][:10] # Top 10 bids
  53. asks = json_message['asks'][:10] # Top 10 asks
  54. order_book = {
  55. 'bid_price': [float(bid[0]) for bid in bids],
  56. 'bid_qty': [float(bid[1]) for bid in bids],
  57. 'ask_price': [float(ask[0]) for ask in asks],
  58. 'ask_qty': [float(ask[1]) for ask in asks]
  59. }
  60. df_order_book = pd.DataFrame([order_book])
  61. def on_error(_ws, error):
  62. logger.error(error)
  63. def on_open(_ws):
  64. print("### opened ###")
  65. # Create a WebSocket app
  66. ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
  67. ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
  68. # 定义要传递给 run_forever 的参数
  69. http_proxy_host = "127.0.0.1"
  70. http_proxy_port = 7890
  71. proxy_type = "http"
  72. # Run the WebSocket with proxy settings
  73. trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
  74. 'http_proxy_host': http_proxy_host,
  75. 'http_proxy_port': http_proxy_port,
  76. 'proxy_type': proxy_type
  77. })
  78. depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
  79. 'http_proxy_host': http_proxy_host,
  80. 'http_proxy_port': http_proxy_port,
  81. 'proxy_type': proxy_type
  82. })
  83. trade_thread.start()
  84. depth_thread.start()
  85. # 步骤三:价差随机过程建模
  86. # Define the states and transition matrix
  87. states = ['tight', 'normal', 'wide']
  88. transition_matrix = np.zeros((3, 3))
  89. # Function to update the transition matrix based on historical data
  90. def update_transition_matrix(df):
  91. global transition_matrix
  92. for i in range(len(df) - 1):
  93. current_state = df['state'].iloc[i]
  94. next_state = df['state'].iloc[i + 1]
  95. transition_matrix[states.index(current_state), states.index(next_state)] += 1
  96. transition_matrix = transition_matrix / transition_matrix.sum(axis=1, keepdims=True)
  97. # Function to classify the spread into states
  98. def classify_spread(spread):
  99. if spread < 0.01:
  100. return 'tight'
  101. elif spread < 0.02:
  102. return 'normal'
  103. else:
  104. return 'wide'
  105. # Function to calculate spread and classify it
  106. def calculate_and_classify_spread():
  107. global df_trades
  108. df_trades['spread'] = df_trades['price'].diff().abs()
  109. df_trades['state'] = df_trades['spread'].apply(classify_spread)
  110. # Update the transition matrix periodically
  111. def update_transition_matrix_periodically():
  112. calculate_and_classify_spread()
  113. update_transition_matrix(df_trades)
  114. logger.info("Transition Matrix:\n%s", transition_matrix)
  115. # Run the update function periodically, e.g., every minute
  116. transition_matrix_update_thread = threading.Timer(60.0, update_transition_matrix_periodically)
  117. transition_matrix_update_thread.start()