| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import json
- import pandas as pd
- import time
- import queue
- import threading
- from logger_config import logger
- # Initialize the DataFrame
- df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
- df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
- previous_order_book = None
- fill_probabilities = {}
- order_disappearances = {}
- order_executions = {}
- last_trade = {'price': None, 'qty': None, 'side': None}
- messages = queue.Queue() # 创建一个线程安全队列
- stop_event = threading.Event()
- def on_message_trade(_ws, message):
- global df_trades, order_executions, last_trade
- json_message = json.loads(message)
- trade = {
- 'price': float(json_message['p']),
- 'qty': float(json_message['q']),
- 'timestamp': pd.to_datetime(json_message['T'], unit='ms'),
- 'side': 'buy' if json_message['m'] else 'sell' # 'm' indicates是否买方是做市商
- }
- trade_df = pd.DataFrame([trade])
- if not trade_df.empty and not trade_df.isna().all().all():
- df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
- # 记录每个价格的实际成交总量
- price = trade['price']
- last_trade = {'price': price, 'qty': trade['qty'], 'side': trade['side']}
- if price not in order_executions:
- order_executions[price] = 0
- order_executions[price] += trade['qty']
- calculate_fill_probabilities()
- def on_message_depth(_ws, message):
- global df_order_book, order_disappearances, previous_order_book
- json_message = json.loads(message)
- bids = json_message['bids'][:10] # Top 10 bids
- asks = json_message['asks'][:10] # Top 10 asks
- order_book = {
- 'bid_price': [float(bid[0]) for bid in bids],
- 'bid_qty': [float(bid[1]) for bid in bids],
- 'ask_price': [float(ask[0]) for ask in asks],
- 'ask_qty': [float(ask[1]) for ask in asks]
- }
- df_order_book = pd.DataFrame([order_book])
- if previous_order_book is not None:
- # 计算订单消失量
- for level in range(10):
- bid_price = df_order_book['bid_price'].iloc[0][level]
- ask_price = df_order_book['ask_price'].iloc[0][level]
- bid_qty = df_order_book['bid_qty'].iloc[0][level]
- ask_qty = df_order_book['ask_qty'].iloc[0][level]
- prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
- prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
- # 计算bid订单消失量
- if bid_price not in order_disappearances:
- order_disappearances[bid_price] = 0
- if prev_bid_qty > bid_qty:
- disappearances = (prev_bid_qty - bid_qty)
- order_disappearances[bid_price] += disappearances if disappearances > 0 else 0
- # 计算ask订单消失量
- if ask_price not in order_disappearances:
- order_disappearances[ask_price] = 0
- if prev_ask_qty > ask_qty:
- disappearances = (prev_ask_qty - ask_qty)
- order_disappearances[ask_price] += disappearances if disappearances > 0 else 0
- previous_order_book = df_order_book
- calculate_fill_probabilities()
- # 计算成交概率
- def calculate_fill_probabilities():
- global order_executions, order_disappearances, fill_probabilities
- fill_probabilities = {}
- for price in order_disappearances:
- if price in order_executions:
- disappearances = order_disappearances[price]
- executions = order_executions[price]
- # 确保成交概率不大于1
- fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0
- else:
- fill_probabilities[price] = 0
- # 定期记录和保存成交概率
- def log_and_save_fill_probabilities():
- global df_order_book, fill_probabilities
- while not stop_event.is_set():
- if fill_probabilities and not df_order_book.empty:
- asks = [[price, fill_probabilities[price]] for price in df_order_book['ask_price'].iloc[0] if
- price in fill_probabilities]
- bids = [[price, fill_probabilities[price]] for price in df_order_book['bid_price'].iloc[0] if
- price in fill_probabilities]
- last_price = last_trade['price']
- last_qty = last_trade['qty']
- side = last_trade['side']
- data = {
- "asks": asks,
- "bids": bids,
- "last_price": last_price,
- "last_qty": last_qty,
- "side": side,
- "time": int(time.time() * 1000)
- }
- messages.put(data)
- # logger.info("Market Snapshot:\n%s", json.dumps(data))
- stop_event.wait(0.1)
|