Parcourir la source

订单流的部分加入。

skyffire il y a 1 an
Parent
commit
90fb19fd24
2 fichiers modifiés avec 230 ajouts et 2 suppressions
  1. 58 2
      binance_gp_demo.py
  2. 172 0
      binance_order_flow.py

+ 58 - 2
binance_gp_demo.py

@@ -140,9 +140,65 @@ def update_transition_matrix_periodically():
         calculate_and_classify_spread()
         update_transition_matrix(df_trades)
         current_state = df_trades['state'].iloc[-1] if not df_trades.empty else 'unknown'
-        logger.info("Current State: %s\nTransition Matrix:\n%s\n", current_state, transition_matrix)
+        # logger.info("Current State: %s\nTransition Matrix:\n%s\n", current_state, transition_matrix)
         stop_event.wait(5)  # 每5秒更新一次
 
 # 创建并启动线程
 transition_matrix_update_thread = threading.Thread(target=update_transition_matrix_periodically)
-transition_matrix_update_thread.start()
+transition_matrix_update_thread.start()
+
+
+# 四、参数估计,我们将通过订单簿数据估计成交密度。
+# 用于存储前一个订单簿的全局变量
+previous_order_book = None
+# Function to estimate fill probabilities and order flow based on order book data and trades
+def estimate_fill_probabilities_and_order_flow(order_book, trades):
+    global previous_order_book
+    fill_probabilities = {}
+    order_flow = {'new_orders': 0, 'cancellations': 0, 'executions': 0}
+
+    if previous_order_book is not None:
+        for level in range(10):  # 取前10档数据
+            bid_price = order_book['bid_price'].iloc[0][level]
+            ask_price = order_book['ask_price'].iloc[0][level]
+            bid_qty = order_book['bid_qty'].iloc[0][level]
+            ask_qty = order_book['ask_qty'].iloc[0][level]
+
+            prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
+            prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
+
+            # 计算成交量占订单量的比例
+            fill_bid = trades[(trades['price'] <= bid_price) & (trades['price'] > bid_price - 0.01)]
+            fill_ask = trades[(trades['price'] >= ask_price) & (trades['price'] < ask_price + 0.01)]
+
+            fill_probabilities[f'bid_{level+1}'] = fill_bid['qty'].sum() / bid_qty if bid_qty > 0 else 0
+            fill_probabilities[f'ask_{level+1}'] = fill_ask['qty'].sum() / ask_qty if ask_qty > 0 else 0
+
+            # 计算订单流
+            new_bid_orders = max(0, bid_qty - prev_bid_qty)
+            new_ask_orders = max(0, ask_qty - prev_ask_qty)
+            cancellations_bid = max(0, prev_bid_qty - bid_qty)
+            cancellations_ask = max(0, prev_ask_qty - ask_qty)
+
+            order_flow['new_orders'] += new_bid_orders + new_ask_orders
+            order_flow['cancellations'] += cancellations_bid + cancellations_ask
+            order_flow['executions'] += fill_bid['qty'].sum() + fill_ask['qty'].sum()
+
+    # 更新 previous_order_book
+    previous_order_book = order_book.copy()
+
+    return fill_probabilities, order_flow
+
+# Estimate fill probabilities and order flow periodically
+def estimate_fill_probabilities_and_order_flow_periodically():
+    global df_order_book
+    while not stop_event.is_set():
+        if not df_order_book.empty and not df_trades.empty:
+            fill_probabilities, order_flow = estimate_fill_probabilities_and_order_flow(df_order_book, df_trades)
+            logger.info("Fill Probabilities:\n%s", fill_probabilities)
+            logger.info("Order Flow:\n%s\n", order_flow)
+        stop_event.wait(5)  # 每10秒更新一次
+
+# 创建并启动线程
+fill_probabilities_thread = threading.Thread(target=estimate_fill_probabilities_and_order_flow_periodically)
+fill_probabilities_thread.start()

+ 172 - 0
binance_order_flow.py

@@ -0,0 +1,172 @@
+import threading
+import websocket
+import json
+import pandas as pd
+import numpy as np
+import time
+import warnings
+import logging
+import colorlog
+
+# 忽略 FutureWarning
+warnings.simplefilter(action='ignore', category=FutureWarning)
+
+# 配置日志
+handler = colorlog.StreamHandler()
+handler.setFormatter(colorlog.ColoredFormatter(
+    "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
+    datefmt=None,
+    reset=True,
+    log_colors={
+        'DEBUG': 'cyan',
+        'INFO': 'blue',
+        'WARNING': 'yellow',
+        'ERROR': 'red',
+        'CRITICAL': 'bold_red',
+    }
+))
+logger = logging.getLogger("market_monitor")
+logger.setLevel(logging.INFO)
+logger.addHandler(handler)
+
+# Binance WebSocket API URL
+SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
+SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
+
+# Initialize the DataFrame
+df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
+df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
+
+previous_order_book = None
+fill_probabilities = {}
+order_disappearances = {}
+order_executions = {}
+
+
+def on_message_trade(_ws, message):
+    global df_trades, order_executions
+    json_message = json.loads(message)
+    trade = {
+        'price': float(json_message['p']),
+        'qty': float(json_message['q']),
+        'timestamp': pd.to_datetime(json_message['T'], unit='ms')
+    }
+    trade_df = pd.DataFrame([trade])
+    if not trade_df.empty and not trade_df.isna().all().all():
+        df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
+
+        # 记录每个价格的实际成交总量
+        price = trade['price']
+        if price not in order_executions:
+            order_executions[price] = 0
+        order_executions[price] += trade['qty']
+
+
+def on_message_depth(_ws, message):
+    global df_order_book, order_disappearances, previous_order_book
+    json_message = json.loads(message)
+    bids = json_message['bids'][:10]  # Top 10 bids
+    asks = json_message['asks'][:10]  # Top 10 asks
+    order_book = {
+        'bid_price': [float(bid[0]) for bid in bids],
+        'bid_qty': [float(bid[1]) for bid in bids],
+        'ask_price': [float(ask[0]) for ask in asks],
+        'ask_qty': [float(ask[1]) for ask in asks]
+    }
+    current_order_book = pd.DataFrame([order_book])
+
+    if previous_order_book is not None:
+        # 计算订单消失量
+        for level in range(10):
+            bid_price = current_order_book['bid_price'].iloc[0][level]
+            ask_price = current_order_book['ask_price'].iloc[0][level]
+            bid_qty = current_order_book['bid_qty'].iloc[0][level]
+            ask_qty = current_order_book['ask_qty'].iloc[0][level]
+
+            prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
+            prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
+
+            # 计算bid订单消失量
+            if bid_price not in order_disappearances:
+                order_disappearances[bid_price] = 0
+            if prev_bid_qty > bid_qty:
+                order_disappearances[bid_price] += (prev_bid_qty - bid_qty)
+
+            # 计算ask订单消失量
+            if ask_price not in order_disappearances:
+                order_disappearances[ask_price] = 0
+            if prev_ask_qty > ask_qty:
+                order_disappearances[ask_price] += (prev_ask_qty - ask_qty)
+
+    previous_order_book = current_order_book
+
+
+def on_error(_ws, error):
+    logger.error(error)
+
+
+def on_open(_ws):
+    print("### opened ###")
+
+
+# Create a WebSocket app
+ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
+ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
+
+# 定义要传递给 run_forever 的参数
+http_proxy_host = "127.0.0.1"
+http_proxy_port = 7890
+proxy_type = "http"
+# Run the WebSocket with proxy settings
+trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
+    'http_proxy_host': http_proxy_host,
+    'http_proxy_port': http_proxy_port,
+    'proxy_type': proxy_type
+})
+depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
+    'http_proxy_host': http_proxy_host,
+    'http_proxy_port': http_proxy_port,
+    'proxy_type': proxy_type
+})
+trade_thread.start()
+depth_thread.start()
+
+stop_event = threading.Event()
+
+
+# Function to calculate fill probabilities
+def calculate_fill_probabilities():
+    global order_executions, order_disappearances, fill_probabilities
+    fill_probabilities = {}
+    for price in order_disappearances:
+        if price in order_executions:
+            disappearances = order_disappearances[price]
+            executions = order_executions[price]
+            # 确保成交概率不大于1
+            fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0
+        else:
+            fill_probabilities[price] = 0
+
+
+# Function to periodically log fill probabilities
+def log_fill_probabilities_periodically():
+    while not stop_event.is_set():
+        calculate_fill_probabilities()
+        if fill_probabilities:
+            logger.info("Fill Probabilities:\n%s", repr(fill_probabilities))
+        stop_event.wait(5)  # 每5秒打印一次
+
+
+# 启动定期打印线程
+log_fill_probabilities_thread = threading.Thread(target=log_fill_probabilities_periodically)
+log_fill_probabilities_thread.start()
+
+
+def stop_all_threads():
+    stop_event.set()
+    trade_thread.join()
+    depth_thread.join()
+    log_fill_probabilities_thread.join()
+
+# 停止所有线程(在需要停止时调用)
+# stop_all_threads()