import threading import websocket import json import pandas as pd import warnings import logging import colorlog # 忽略 FutureWarning warnings.simplefilter(action='ignore', category=FutureWarning) # 配置日志 handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter( "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s", datefmt=None, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'blue', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red', } )) logger = logging.getLogger("market_monitor") logger.setLevel(logging.INFO) logger.addHandler(handler) # Binance WebSocket API URL SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade" SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms" # Initialize the DataFrame df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp']) df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty']) previous_order_book = None fill_probabilities = {} order_disappearances = {} order_executions = {} last_trade_price = None def on_message_trade(_ws, message): global df_trades, order_executions, last_trade_price json_message = json.loads(message) trade = { 'price': float(json_message['p']), 'qty': float(json_message['q']), 'timestamp': pd.to_datetime(json_message['T'], unit='ms') } trade_df = pd.DataFrame([trade]) if not trade_df.empty and not trade_df.isna().all().all(): df_trades = pd.concat([df_trades, trade_df], ignore_index=True) # 记录每个价格的实际成交总量 price = trade['price'] last_trade_price = price if price not in order_executions: order_executions[price] = 0 order_executions[price] += trade['qty'] def on_message_depth(_ws, message): global df_order_book, order_disappearances, previous_order_book json_message = json.loads(message) bids = json_message['bids'][:10] # Top 10 bids asks = json_message['asks'][:10] # Top 10 asks order_book = { 'bid_price': [float(bid[0]) for bid in bids], 'bid_qty': [float(bid[1]) for bid in bids], 'ask_price': [float(ask[0]) for ask in asks], 'ask_qty': [float(ask[1]) for ask in asks] } current_order_book = pd.DataFrame([order_book]) if previous_order_book is not None: # 计算订单消失量 for level in range(10): bid_price = current_order_book['bid_price'].iloc[0][level] ask_price = current_order_book['ask_price'].iloc[0][level] bid_qty = current_order_book['bid_qty'].iloc[0][level] ask_qty = current_order_book['ask_qty'].iloc[0][level] prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level] prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level] # 计算bid订单消失量 if bid_price not in order_disappearances: order_disappearances[bid_price] = 0 if prev_bid_qty > bid_qty: order_disappearances[bid_price] += (prev_bid_qty - bid_qty) # 计算ask订单消失量 if ask_price not in order_disappearances: order_disappearances[ask_price] = 0 if prev_ask_qty > ask_qty: order_disappearances[ask_price] += (prev_ask_qty - ask_qty) previous_order_book = current_order_book def on_error(_ws, error): logger.error(error) def on_open(_ws): print("### opened ###") # Create a WebSocket app ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error) ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error) # 定义要传递给 run_forever 的参数 http_proxy_host = "127.0.0.1" http_proxy_port = 7890 proxy_type = "http" # Run the WebSocket with proxy settings trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={ 'http_proxy_host': http_proxy_host, 'http_proxy_port': http_proxy_port, 'proxy_type': proxy_type }) depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={ 'http_proxy_host': http_proxy_host, 'http_proxy_port': http_proxy_port, 'proxy_type': proxy_type }) trade_thread.start() depth_thread.start() stop_event = threading.Event() # Function to calculate fill probabilities def calculate_fill_probabilities(): global order_executions, order_disappearances, fill_probabilities fill_probabilities = {} for price in order_disappearances: if price in order_executions: disappearances = order_disappearances[price] executions = order_executions[price] # 确保成交概率不大于1 fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0 else: fill_probabilities[price] = 0 # Function to periodically log fill probabilities def log_fill_probabilities_periodically(): while not stop_event.is_set(): calculate_fill_probabilities() if fill_probabilities: logger.info("Fill Probabilities:\n%s", repr(fill_probabilities)) stop_event.wait(5) # 每5秒打印一次 # 启动定期打印线程 log_fill_probabilities_thread = threading.Thread(target=log_fill_probabilities_periodically) log_fill_probabilities_thread.start() def stop_all_threads(): stop_event.set() trade_thread.join() depth_thread.join() log_fill_probabilities_thread.join() # 停止所有线程(在需要停止时调用) # stop_all_threads()