import threading import websocket import json import pandas as pd import numpy as np from scipy.optimize import minimize import time import warnings import logging import colorlog # 忽略 FutureWarning warnings.simplefilter(action='ignore', category=FutureWarning) # 配置日志 handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter( "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s", datefmt=None, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'blue', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', } )) logger = logging.getLogger("binance_gp_demo") logger.setLevel(logging.INFO) logger.addHandler(handler) # 步骤二:订阅Binance的成交数据和订单簿数据 # Binance WebSocket API URL SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade" SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms" # Initialize the DataFrame df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp']) df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty']) def on_message_trade(_ws, message): global df_trades json_message = json.loads(message) trade = { 'price': float(json_message['p']), 'qty': float(json_message['q']), 'timestamp': pd.to_datetime(json_message['T'], unit='ms') } trade_df = pd.DataFrame([trade]) if not trade_df.empty and not trade_df.isna().all().all(): df_trades = pd.concat([df_trades, trade_df], ignore_index=True) # Function to handle order book messages def on_message_depth(_ws, message): global df_order_book json_message = json.loads(message) bids = json_message['bids'][:10] # Top 10 bids asks = json_message['asks'][:10] # Top 10 asks order_book = { 'bid_price': [float(bid[0]) for bid in bids], 'bid_qty': [float(bid[1]) for bid in bids], 'ask_price': [float(ask[0]) for ask in asks], 'ask_qty': [float(ask[1]) for ask in asks] } df_order_book = pd.DataFrame([order_book]) def on_error(_ws, error): logger.error(error) def on_open(_ws): print("### opened ###") # Create a WebSocket app ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error) ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error) # 定义要传递给 run_forever 的参数 http_proxy_host = "127.0.0.1" http_proxy_port = 7890 proxy_type = "http" # Run the WebSocket with proxy settings trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={ 'http_proxy_host': http_proxy_host, 'http_proxy_port': http_proxy_port, 'proxy_type': proxy_type }) depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={ 'http_proxy_host': http_proxy_host, 'http_proxy_port': http_proxy_port, 'proxy_type': proxy_type }) trade_thread.start() depth_thread.start() # 步骤三:价差随机过程建模 # Define the states and transition matrix states = ['tight', 'normal', 'wide'] transition_matrix = np.zeros((3, 3)) # 定义函数来更新转移矩阵 def update_transition_matrix(df): global transition_matrix for i in range(len(df) - 1): current_state = df['state'].iloc[i] next_state = df['state'].iloc[i + 1] transition_matrix[states.index(current_state), states.index(next_state)] += 1 row_sums = transition_matrix.sum(axis=1, keepdims=True) row_sums[row_sums == 0] = 1 transition_matrix = transition_matrix / row_sums # 定义函数来分类价差状态 def classify_spread(spread): if spread < 0.01: return 'tight' elif spread < 0.02: return 'normal' else: return 'wide' # 定义函数来计算价差并进行分类 def calculate_and_classify_spread(): global df_trades df_trades['spread'] = df_trades['price'].diff().abs() df_trades['state'] = df_trades['spread'].apply(classify_spread) # 定义周期性更新转移矩阵的函数 stop_event = threading.Event() def update_transition_matrix_periodically(): while not stop_event.is_set(): calculate_and_classify_spread() update_transition_matrix(df_trades) current_state = df_trades['state'].iloc[-1] if not df_trades.empty else 'unknown' # logger.info("Current State: %s\nTransition Matrix:\n%s\n", current_state, transition_matrix) stop_event.wait(5) # 每5秒更新一次 # 创建并启动线程 transition_matrix_update_thread = threading.Thread(target=update_transition_matrix_periodically) transition_matrix_update_thread.start() # 四、参数估计,我们将通过订单簿数据估计成交密度。 # 用于存储前一个订单簿的全局变量 previous_order_book = None # Function to estimate fill probabilities and order flow based on order book data and trades def estimate_fill_probabilities_and_order_flow(order_book, trades): global previous_order_book fill_probabilities = {} order_flow = {'new_orders': 0, 'cancellations': 0, 'executions': 0} if previous_order_book is not None: for level in range(10): # 取前10档数据 bid_price = order_book['bid_price'].iloc[0][level] ask_price = order_book['ask_price'].iloc[0][level] bid_qty = order_book['bid_qty'].iloc[0][level] ask_qty = order_book['ask_qty'].iloc[0][level] prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level] prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level] # 计算成交量占订单量的比例 fill_bid = trades[(trades['price'] <= bid_price) & (trades['price'] > bid_price - 0.01)] fill_ask = trades[(trades['price'] >= ask_price) & (trades['price'] < ask_price + 0.01)] fill_probabilities[f'bid_{level+1}'] = fill_bid['qty'].sum() / bid_qty if bid_qty > 0 else 0 fill_probabilities[f'ask_{level+1}'] = fill_ask['qty'].sum() / ask_qty if ask_qty > 0 else 0 # 计算订单流 new_bid_orders = max(0, bid_qty - prev_bid_qty) new_ask_orders = max(0, ask_qty - prev_ask_qty) cancellations_bid = max(0, prev_bid_qty - bid_qty) cancellations_ask = max(0, prev_ask_qty - ask_qty) order_flow['new_orders'] += new_bid_orders + new_ask_orders order_flow['cancellations'] += cancellations_bid + cancellations_ask order_flow['executions'] += fill_bid['qty'].sum() + fill_ask['qty'].sum() # 更新 previous_order_book previous_order_book = order_book.copy() return fill_probabilities, order_flow # Estimate fill probabilities and order flow periodically def estimate_fill_probabilities_and_order_flow_periodically(): global df_order_book while not stop_event.is_set(): if not df_order_book.empty and not df_trades.empty: fill_probabilities, order_flow = estimate_fill_probabilities_and_order_flow(df_order_book, df_trades) logger.info("Fill Probabilities:\n%s", fill_probabilities) logger.info("Order Flow:\n%s\n", order_flow) stop_event.wait(5) # 每10秒更新一次 # 创建并启动线程 fill_probabilities_thread = threading.Thread(target=estimate_fill_probabilities_and_order_flow_periodically) fill_probabilities_thread.start()