瀏覽代碼

加入ws(跑不起来。)

skyffire 1 年之前
父節點
當前提交
2501ae05f4
共有 1 個文件被更改,包括 75 次插入24 次删除
  1. 75 24
      binance_order_flow.py

+ 75 - 24
binance_order_flow.py

@@ -1,3 +1,5 @@
+import asyncio
+import queue
 import threading
 import websocket
 import json
@@ -5,6 +7,7 @@ import pandas as pd
 import warnings
 import logging
 import colorlog
+import websockets
 
 # 忽略 FutureWarning
 warnings.simplefilter(action='ignore', category=FutureWarning)
@@ -39,16 +42,18 @@ previous_order_book = None
 fill_probabilities = {}
 order_disappearances = {}
 order_executions = {}
-last_trade_price = None
+last_trade = {'price': None, 'qty': None, 'side': None}
+messages = queue.Queue()                        # 创建一个线程安全队列
 
 
 def on_message_trade(_ws, message):
-    global df_trades, order_executions, last_trade_price
+    global df_trades, order_executions, last_trade
     json_message = json.loads(message)
     trade = {
         'price': float(json_message['p']),
         'qty': float(json_message['q']),
-        'timestamp': pd.to_datetime(json_message['T'], unit='ms')
+        'timestamp': pd.to_datetime(json_message['T'], unit='ms'),
+        'side': 'buy' if json_message['m'] else 'sell'  # 'm' indicates whether the buyer is the market maker
     }
     trade_df = pd.DataFrame([trade])
     if not trade_df.empty and not trade_df.isna().all().all():
@@ -56,10 +61,11 @@ def on_message_trade(_ws, message):
 
         # 记录每个价格的实际成交总量
         price = trade['price']
-        last_trade_price = price
+        last_trade = {'price': price, 'qty': trade['qty'], 'side': trade['side']}
         if price not in order_executions:
             order_executions[price] = 0
         order_executions[price] += trade['qty']
+        calculate_fill_probabilities()
 
 
 def on_message_depth(_ws, message):
@@ -73,15 +79,15 @@ def on_message_depth(_ws, message):
         'ask_price': [float(ask[0]) for ask in asks],
         'ask_qty': [float(ask[1]) for ask in asks]
     }
-    current_order_book = pd.DataFrame([order_book])
+    df_order_book = pd.DataFrame([order_book])
 
     if previous_order_book is not None:
         # 计算订单消失量
         for level in range(10):
-            bid_price = current_order_book['bid_price'].iloc[0][level]
-            ask_price = current_order_book['ask_price'].iloc[0][level]
-            bid_qty = current_order_book['bid_qty'].iloc[0][level]
-            ask_qty = current_order_book['ask_qty'].iloc[0][level]
+            bid_price = df_order_book['bid_price'].iloc[0][level]
+            ask_price = df_order_book['ask_price'].iloc[0][level]
+            bid_qty = df_order_book['bid_qty'].iloc[0][level]
+            ask_qty = df_order_book['ask_qty'].iloc[0][level]
 
             prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
             prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
@@ -98,7 +104,8 @@ def on_message_depth(_ws, message):
             if prev_ask_qty > ask_qty:
                 order_disappearances[ask_price] += (prev_ask_qty - ask_qty)
 
-    previous_order_book = current_order_book
+    previous_order_book = df_order_book
+    calculate_fill_probabilities()
 
 
 def on_error(_ws, error):
@@ -148,25 +155,69 @@ def calculate_fill_probabilities():
             fill_probabilities[price] = 0
 
 
-# Function to periodically log fill probabilities
-def log_fill_probabilities_periodically():
-    while not stop_event.is_set():
-        calculate_fill_probabilities()
-        if fill_probabilities:
-            logger.info("Fill Probabilities:\n%s", repr(fill_probabilities))
-        stop_event.wait(5)  # 每5秒打印一次
-
-
-# 启动定期打印线程
-log_fill_probabilities_thread = threading.Thread(target=log_fill_probabilities_periodically)
-log_fill_probabilities_thread.start()
+# Function to periodically log and save fill probabilities
+# Function to periodically log and save fill probabilities
+def log_and_save_fill_probabilities():
+    global df_order_book, fill_probabilities
 
+    while not stop_event.is_set():
+        if fill_probabilities and not df_order_book.empty:
+            asks = [[price, fill_probabilities[price]] for price in df_order_book['ask_price'].iloc[0] if price in fill_probabilities]
+            bids = [[price, fill_probabilities[price]] for price in df_order_book['bid_price'].iloc[0] if price in fill_probabilities]
+            last_price = last_trade['price']
+            last_qty = last_trade['qty']
+            side = last_trade['side']
+            data = {
+                "asks": asks,
+                "bids": bids,
+                "last_price": last_price,
+                "last_qty": last_qty,
+                "side": side
+            }
+            messages.put(data)
+            # logger.info("Market Snapshot:\n%s", json.dumps(data))
+        stop_event.wait(1)  # 每5秒更新一次
+
+# 启动定期保存和打印线程
+log_and_save_thread = threading.Thread(target=log_and_save_fill_probabilities)
+log_and_save_thread.start()
+
+
+async def ws_inited(ws, path):
+    logger.info("客户端已连接上,ws://localhost:6789初始化完毕")
+    while True:
+        message = await asyncio.get_running_loop().run_in_executor(None, messages.get)
+
+        # 检查连接是否仍然开放
+        if ws.open:
+            message_data = json.dumps(message)  # 将字典序列化为JSON字符串
+            await ws.send(message_data)
+        else:
+            logger.info("WebSocket 连接已关闭")
+            break
+
+def start_websocket_server():
+    loop = asyncio.new_event_loop()
+    asyncio.set_event_loop(loop)
+    try:
+        loop.run_until_complete(
+            websockets.serve(ws_inited, "localhost", 6789)
+        )
+        logger.info("WebSocket 服务器启动在 ws://localhost:6789")  # 直接使用 print 输出
+        loop.run_forever()
+    except Exception as e:
+        logger.error(f"无法启动WebSocket服务器: {e}")  # 输出错误信息
+    finally:
+        loop.close()
+
+# 在新线程中启动 WebSocket 服务器
+threading.Thread(target=start_websocket_server, daemon=True).start()
 
 def stop_all_threads():
     stop_event.set()
     trade_thread.join()
     depth_thread.join()
-    log_fill_probabilities_thread.join()
+    log_and_save_thread.join()
 
 # 停止所有线程(在需要停止时调用)
-# stop_all_threads()
+# stop_all_threads()