import json import pandas as pd import time import queue import threading from logger_config import logger # Initialize the DataFrame df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp']) df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty']) previous_order_book = None fill_probabilities = {} order_disappearances = {} order_executions = {} last_trade = {'price': None, 'qty': None, 'side': None} messages = queue.Queue() # 创建一个线程安全队列 stop_event = threading.Event() def on_message_trade(_ws, message): global df_trades, order_executions, last_trade json_message = json.loads(message) trade = { 'price': float(json_message['data']['p']), 'qty': float(json_message['data']['q']), 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'), 'side': 'buy' if json_message['data']['m'] else 'sell' # 'm' indicates是否买方是做市商 } trade_df = pd.DataFrame([trade]) if not trade_df.empty and not trade_df.isna().all().all(): df_trades = pd.concat([df_trades, trade_df], ignore_index=True) # 记录每个价格的实际成交总量 price = trade['price'] last_trade = {'price': price, 'qty': trade['qty'], 'side': trade['side']} if price not in order_executions: order_executions[price] = 0 order_executions[price] += trade['qty'] calculate_fill_probabilities() def on_message_depth(_ws, message): global df_order_book, order_disappearances, previous_order_book json_message = json.loads(message) bids = json_message['data']['b'][:10] # Top 10 bids asks = json_message['data']['a'][:10] # Top 10 asks order_book = { 'bid_price': [float(bid[0]) for bid in bids], 'bid_qty': [float(bid[1]) for bid in bids], 'ask_price': [float(ask[0]) for ask in asks], 'ask_qty': [float(ask[1]) for ask in asks] } df_order_book = pd.DataFrame([order_book]) if previous_order_book is not None: # 计算订单消失量 for level in range(10): bid_price = df_order_book['bid_price'].iloc[0][level] ask_price = df_order_book['ask_price'].iloc[0][level] bid_qty = df_order_book['bid_qty'].iloc[0][level] ask_qty = df_order_book['ask_qty'].iloc[0][level] prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level] prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level] # 计算bid订单消失量 if bid_price not in order_disappearances: order_disappearances[bid_price] = 0 if prev_bid_qty > bid_qty: disappearances = (prev_bid_qty - bid_qty) order_disappearances[bid_price] += disappearances if disappearances > 0 else 0 # 计算ask订单消失量 if ask_price not in order_disappearances: order_disappearances[ask_price] = 0 if prev_ask_qty > ask_qty: disappearances = (prev_ask_qty - ask_qty) order_disappearances[ask_price] += disappearances if disappearances > 0 else 0 previous_order_book = df_order_book calculate_fill_probabilities() # 计算成交概率 def calculate_fill_probabilities(): global order_executions, order_disappearances, fill_probabilities, df_order_book for price in order_disappearances: if price in order_executions: disappearances = order_disappearances[price] executions = order_executions[price] # 确保成交概率不大于1 fill_probabilities[price] = executions / disappearances if disappearances > 0 else 0 else: fill_probabilities[price] = 0 if fill_probabilities and not df_order_book.empty and last_trade['price'] is not None: last_price = last_trade['price'] asks = [[price, fill_probabilities[price]] for price in fill_probabilities.keys() if price > last_price and price in fill_probabilities] bids = [[price, fill_probabilities[price]] for price in fill_probabilities.keys() if price < last_price and price in fill_probabilities] asks_sorted = sorted(asks, key=lambda x: x[0]) bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True) # last_qty = last_trade['qty'] last_qty = 0 side = last_trade['side'] data = { "asks": asks_sorted, "bids": bids_sorted, "last_price": last_price, "last_qty": last_qty, "side": side, "time": int(time.time() * 1000) } messages.put(data)