瀏覽代碼

订阅binance合约,修复一些小问题。

skyffire 1 年之前
父節點
當前提交
78209fb0a7

+ 30 - 34
binance_order_flow/data_processing.py

@@ -25,10 +25,10 @@ def on_message_trade(_ws, message):
     global df_trades, order_executions, last_trade
     json_message = json.loads(message)
     trade = {
-        'price': float(json_message['p']),
-        'qty': float(json_message['q']),
-        'timestamp': pd.to_datetime(json_message['T'], unit='ms'),
-        'side': 'buy' if json_message['m'] else 'sell'  # 'm' indicates是否买方是做市商
+        'price': float(json_message['data']['p']),
+        'qty': float(json_message['data']['q']),
+        'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
+        'side': 'buy' if json_message['data']['m'] else 'sell'  # 'm' indicates是否买方是做市商
     }
     trade_df = pd.DataFrame([trade])
     if not trade_df.empty and not trade_df.isna().all().all():
@@ -46,8 +46,8 @@ def on_message_trade(_ws, message):
 def on_message_depth(_ws, message):
     global df_order_book, order_disappearances, previous_order_book
     json_message = json.loads(message)
-    bids = json_message['bids'][:10]  # Top 10 bids
-    asks = json_message['asks'][:10]  # Top 10 asks
+    bids = json_message['data']['b'][:10]  # Top 10 bids
+    asks = json_message['data']['a'][:10]  # Top 10 asks
     order_book = {
         'bid_price': [float(bid[0]) for bid in bids],
         'bid_qty': [float(bid[1]) for bid in bids],
@@ -87,39 +87,35 @@ def on_message_depth(_ws, message):
 
 # 计算成交概率
 def calculate_fill_probabilities():
-    global order_executions, order_disappearances, fill_probabilities
-    fill_probabilities = {}
+    global order_executions, order_disappearances, fill_probabilities, df_order_book
+
     for price in order_disappearances:
         if price in order_executions:
             disappearances = order_disappearances[price]
             executions = order_executions[price]
             # 确保成交概率不大于1
-            fill_probabilities[price] = min(executions / disappearances, 1) if disappearances > 0 else 0
+            fill_probabilities[price] = executions / disappearances if disappearances > 0 else 0
         else:
             fill_probabilities[price] = 0
 
-
-# 定期记录和保存成交概率
-def log_and_save_fill_probabilities():
-    global df_order_book, fill_probabilities
-
-    while not stop_event.is_set():
-        if fill_probabilities and not df_order_book.empty:
-            asks = [[price, fill_probabilities[price]] for price in df_order_book['ask_price'].iloc[0] if
-                    price in fill_probabilities]
-            bids = [[price, fill_probabilities[price]] for price in df_order_book['bid_price'].iloc[0] if
-                    price in fill_probabilities]
-            last_price = last_trade['price']
-            last_qty = last_trade['qty']
-            side = last_trade['side']
-            data = {
-                "asks": asks,
-                "bids": bids,
-                "last_price": last_price,
-                "last_qty": last_qty,
-                "side": side,
-                "time": int(time.time() * 1000)
-            }
-            messages.put(data)
-            # logger.info("Market Snapshot:\n%s", json.dumps(data))
-        stop_event.wait(0.1)
+    if fill_probabilities and not df_order_book.empty and last_trade['price'] is not None:
+        last_price = last_trade['price']
+        asks = [[price, fill_probabilities[price]] for price in fill_probabilities.keys() if
+                price > last_price and price in fill_probabilities]
+        bids = [[price, fill_probabilities[price]] for price in fill_probabilities.keys() if
+                price < last_price and price in fill_probabilities]
+        asks_sorted = sorted(asks, key=lambda x: x[0])
+        bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True)
+
+        # last_qty = last_trade['qty']
+        last_qty = 0
+        side = last_trade['side']
+        data = {
+            "asks": asks_sorted,
+            "bids": bids_sorted,
+            "last_price": last_price,
+            "last_qty": last_qty,
+            "side": side,
+            "time": int(time.time() * 1000)
+        }
+        messages.put(data)

+ 2 - 1
binance_order_flow/logger_config.py

@@ -4,7 +4,7 @@ import colorlog
 # 配置日志
 handler = colorlog.StreamHandler()
 handler.setFormatter(colorlog.ColoredFormatter(
-    "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
+    fmt="%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d \n %(message)s",
     datefmt=None,
     reset=True,
     log_colors={
@@ -15,6 +15,7 @@ handler.setFormatter(colorlog.ColoredFormatter(
         'CRITICAL': 'bold_red',
     }
 ))
+
 logger = logging.getLogger("market_monitor")
 logger.setLevel(logging.INFO)
 logger.addHandler(handler)

+ 4 - 5
binance_order_flow/main.py

@@ -2,15 +2,14 @@ import threading
 import asyncio
 from ws_client import start_ws_clients, stop_all_threads
 from ws_server import start_websocket_server
-from data_processing import log_and_save_fill_probabilities
+import warnings
+
+# 忽略 FutureWarning
+warnings.simplefilter(action='ignore', category=FutureWarning)
 
 # 启动 WebSocket 客户端
 start_ws_clients()
 
-# 启动定期保存和打印线程
-log_and_save_thread = threading.Thread(target=log_and_save_fill_probabilities)
-log_and_save_thread.start()
-
 # 启动 WebSocket 服务器
 asyncio.run(start_websocket_server())
 

+ 3 - 2
binance_order_flow/ws_client.py

@@ -5,8 +5,9 @@ from logger_config import logger
 from data_processing import on_message_trade, on_message_depth, stop_event
 
 # Binance WebSocket API URL
-SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
-SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
+SYMBOL = "tokenusdt"
+SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
+SOCKET_DEPTH = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@depth20@100ms"
 
 
 def on_error(_ws, error):