Przeglądaj źródła

更新了逻辑结构,虽然还是很乱,但是……将就吧。

skyffire 1 rok temu
rodzic
commit
d3965cccee

+ 0 - 204
binance_gp_demo.py

@@ -1,204 +0,0 @@
-import threading
-
-import websocket
-import json
-import pandas as pd
-import numpy as np
-from scipy.optimize import minimize
-import time
-import warnings
-import logging
-import colorlog
-
-# 忽略 FutureWarning
-warnings.simplefilter(action='ignore', category=FutureWarning)
-
-# 配置日志
-handler = colorlog.StreamHandler()
-handler.setFormatter(colorlog.ColoredFormatter(
-    "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
-    datefmt=None,
-    reset=True,
-    log_colors={
-        'DEBUG': 'cyan',
-        'INFO': 'blue',
-        'WARNING': 'yellow',
-        'ERROR': 'red',
-        'CRITICAL': 'bold_red',
-    }
-))
-logger = logging.getLogger("binance_gp_demo")
-logger.setLevel(logging.INFO)
-logger.addHandler(handler)
-
-# 步骤二:订阅Binance的成交数据和订单簿数据
-# Binance WebSocket API URL
-SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
-SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
-
-# Initialize the DataFrame
-df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
-df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
-
-
-def on_message_trade(_ws, message):
-    global df_trades
-    json_message = json.loads(message)
-    trade = {
-        'price': float(json_message['p']),
-        'qty': float(json_message['q']),
-        'timestamp': pd.to_datetime(json_message['T'], unit='ms')
-    }
-    trade_df = pd.DataFrame([trade])
-    if not trade_df.empty and not trade_df.isna().all().all():
-        df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
-
-
-# Function to handle order book messages
-def on_message_depth(_ws, message):
-    global df_order_book
-    json_message = json.loads(message)
-    bids = json_message['bids'][:10]  # Top 10 bids
-    asks = json_message['asks'][:10]  # Top 10 asks
-    order_book = {
-        'bid_price': [float(bid[0]) for bid in bids],
-        'bid_qty': [float(bid[1]) for bid in bids],
-        'ask_price': [float(ask[0]) for ask in asks],
-        'ask_qty': [float(ask[1]) for ask in asks]
-    }
-    df_order_book = pd.DataFrame([order_book])
-
-
-def on_error(_ws, error):
-    logger.error(error)
-
-
-def on_open(_ws):
-    print("### opened ###")
-
-
-# Create a WebSocket app
-ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
-ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
-
-# 定义要传递给 run_forever 的参数
-http_proxy_host = "127.0.0.1"
-http_proxy_port = 7890
-proxy_type = "http"
-# Run the WebSocket with proxy settings
-trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
-    'http_proxy_host': http_proxy_host,
-    'http_proxy_port': http_proxy_port,
-    'proxy_type': proxy_type
-})
-depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
-    'http_proxy_host': http_proxy_host,
-    'http_proxy_port': http_proxy_port,
-    'proxy_type': proxy_type
-})
-trade_thread.start()
-depth_thread.start()
-
-# 步骤三:价差随机过程建模
-# Define the states and transition matrix
-states = ['tight', 'normal', 'wide']
-transition_matrix = np.zeros((3, 3))
-
-
-# 定义函数来更新转移矩阵
-def update_transition_matrix(df):
-    global transition_matrix
-    for i in range(len(df) - 1):
-        current_state = df['state'].iloc[i]
-        next_state = df['state'].iloc[i + 1]
-        transition_matrix[states.index(current_state), states.index(next_state)] += 1
-
-    row_sums = transition_matrix.sum(axis=1, keepdims=True)
-    row_sums[row_sums == 0] = 1
-    transition_matrix = transition_matrix / row_sums
-
-# 定义函数来分类价差状态
-def classify_spread(spread):
-    if spread < 0.01:
-        return 'tight'
-    elif spread < 0.02:
-        return 'normal'
-    else:
-        return 'wide'
-
-# 定义函数来计算价差并进行分类
-def calculate_and_classify_spread():
-    global df_trades
-    df_trades['spread'] = df_trades['price'].diff().abs()
-    df_trades['state'] = df_trades['spread'].apply(classify_spread)
-
-# 定义周期性更新转移矩阵的函数
-stop_event = threading.Event()
-
-def update_transition_matrix_periodically():
-    while not stop_event.is_set():
-        calculate_and_classify_spread()
-        update_transition_matrix(df_trades)
-        current_state = df_trades['state'].iloc[-1] if not df_trades.empty else 'unknown'
-        # logger.info("Current State: %s\nTransition Matrix:\n%s\n", current_state, transition_matrix)
-        stop_event.wait(5)  # 每5秒更新一次
-
-# 创建并启动线程
-transition_matrix_update_thread = threading.Thread(target=update_transition_matrix_periodically)
-transition_matrix_update_thread.start()
-
-
-# 四、参数估计,我们将通过订单簿数据估计成交密度。
-# 用于存储前一个订单簿的全局变量
-previous_order_book = None
-# Function to estimate fill probabilities and order flow based on order book data and trades
-def estimate_fill_probabilities_and_order_flow(order_book, trades):
-    global previous_order_book
-    fill_probabilities = {}
-    order_flow = {'new_orders': 0, 'cancellations': 0, 'executions': 0}
-
-    if previous_order_book is not None:
-        for level in range(10):  # 取前10档数据
-            bid_price = order_book['bid_price'].iloc[0][level]
-            ask_price = order_book['ask_price'].iloc[0][level]
-            bid_qty = order_book['bid_qty'].iloc[0][level]
-            ask_qty = order_book['ask_qty'].iloc[0][level]
-
-            prev_bid_qty = previous_order_book['bid_qty'].iloc[0][level]
-            prev_ask_qty = previous_order_book['ask_qty'].iloc[0][level]
-
-            # 计算成交量占订单量的比例
-            fill_bid = trades[(trades['price'] <= bid_price) & (trades['price'] > bid_price - 0.01)]
-            fill_ask = trades[(trades['price'] >= ask_price) & (trades['price'] < ask_price + 0.01)]
-
-            fill_probabilities[f'bid_{level+1}'] = fill_bid['qty'].sum() / bid_qty if bid_qty > 0 else 0
-            fill_probabilities[f'ask_{level+1}'] = fill_ask['qty'].sum() / ask_qty if ask_qty > 0 else 0
-
-            # 计算订单流
-            new_bid_orders = max(0, bid_qty - prev_bid_qty)
-            new_ask_orders = max(0, ask_qty - prev_ask_qty)
-            cancellations_bid = max(0, prev_bid_qty - bid_qty)
-            cancellations_ask = max(0, prev_ask_qty - ask_qty)
-
-            order_flow['new_orders'] += new_bid_orders + new_ask_orders
-            order_flow['cancellations'] += cancellations_bid + cancellations_ask
-            order_flow['executions'] += fill_bid['qty'].sum() + fill_ask['qty'].sum()
-
-    # 更新 previous_order_book
-    previous_order_book = order_book.copy()
-
-    return fill_probabilities, order_flow
-
-# Estimate fill probabilities and order flow periodically
-def estimate_fill_probabilities_and_order_flow_periodically():
-    global df_order_book
-    while not stop_event.is_set():
-        if not df_order_book.empty and not df_trades.empty:
-            fill_probabilities, order_flow = estimate_fill_probabilities_and_order_flow(df_order_book, df_trades)
-            logger.info("Fill Probabilities:\n%s", fill_probabilities)
-            logger.info("Order Flow:\n%s\n", order_flow)
-        stop_event.wait(5)  # 每10秒更新一次
-
-# 创建并启动线程
-fill_probabilities_thread = threading.Thread(target=estimate_fill_probabilities_and_order_flow_periodically)
-fill_probabilities_thread.start()

+ 14 - 108
binance_order_flow.py → binance_order_flow/data_processing.py

@@ -1,39 +1,9 @@
-import asyncio
-import queue
-import threading
-import time
-import websocket
 import json
-import pandas as pd
-import warnings
-import logging
-import colorlog
-import websockets
-
-# 忽略 FutureWarning
-warnings.simplefilter(action='ignore', category=FutureWarning)
-
-# 配置日志
-handler = colorlog.StreamHandler()
-handler.setFormatter(colorlog.ColoredFormatter(
-    "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
-    datefmt=None,
-    reset=True,
-    log_colors={
-        'DEBUG': 'cyan',
-        'INFO': 'blue',
-        'WARNING': 'yellow',
-        'ERROR': 'red',
-        'CRITICAL': 'bold_red',
-    }
-))
-logger = logging.getLogger("market_monitor")
-logger.setLevel(logging.INFO)
-logger.addHandler(handler)
 
-# Binance WebSocket API URL
-SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
-SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
+import pandas as pd
+import time
+import queue
+import threading
 
 # Initialize the DataFrame
 df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
@@ -46,6 +16,8 @@ order_executions = {}
 last_trade = {'price': None, 'qty': None, 'side': None}
 messages = queue.Queue()  # 创建一个线程安全队列
 
+stop_event = threading.Event()
+
 
 def on_message_trade(_ws, message):
     global df_trades, order_executions, last_trade
@@ -54,7 +26,7 @@ def on_message_trade(_ws, message):
         'price': float(json_message['p']),
         'qty': float(json_message['q']),
         'timestamp': pd.to_datetime(json_message['T'], unit='ms'),
-        'side': 'buy' if json_message['m'] else 'sell'  # 'm' indicates whether the buyer is the market maker
+        'side': 'buy' if json_message['m'] else 'sell'  # 'm' indicates是否买方是做市商
     }
     trade_df = pd.DataFrame([trade])
     if not trade_df.empty and not trade_df.isna().all().all():
@@ -109,40 +81,7 @@ def on_message_depth(_ws, message):
     calculate_fill_probabilities()
 
 
-def on_error(_ws, error):
-    logger.error(error)
-
-
-def on_open(_ws):
-    print("### opened ###")
-
-
-# Create a WebSocket app
-ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
-ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
-
-# 定义要传递给 run_forever 的参数
-http_proxy_host = "127.0.0.1"
-http_proxy_port = 7890
-proxy_type = "http"
-# Run the WebSocket with proxy settings
-trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
-    'http_proxy_host': http_proxy_host,
-    'http_proxy_port': http_proxy_port,
-    'proxy_type': proxy_type
-})
-depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
-    'http_proxy_host': http_proxy_host,
-    'http_proxy_port': http_proxy_port,
-    'proxy_type': proxy_type
-})
-trade_thread.start()
-depth_thread.start()
-
-stop_event = threading.Event()
-
-
-# Function to calculate fill probabilities
+# 计算成交概率
 def calculate_fill_probabilities():
     global order_executions, order_disappearances, fill_probabilities
     fill_probabilities = {}
@@ -156,14 +95,16 @@ def calculate_fill_probabilities():
             fill_probabilities[price] = 0
 
 
-# Function to periodically log and save fill probabilities
+# 定期记录和保存成交概率
 def log_and_save_fill_probabilities():
     global df_order_book, fill_probabilities
 
     while not stop_event.is_set():
         if fill_probabilities and not df_order_book.empty:
-            asks = [[price, fill_probabilities[price]] for price in df_order_book['ask_price'].iloc[0] if price in fill_probabilities]
-            bids = [[price, fill_probabilities[price]] for price in df_order_book['bid_price'].iloc[0] if price in fill_probabilities]
+            asks = [[price, fill_probabilities[price]] for price in df_order_book['ask_price'].iloc[0] if
+                    price in fill_probabilities]
+            bids = [[price, fill_probabilities[price]] for price in df_order_book['bid_price'].iloc[0] if
+                    price in fill_probabilities]
             last_price = last_trade['price']
             last_qty = last_trade['qty']
             side = last_trade['side']
@@ -177,39 +118,4 @@ def log_and_save_fill_probabilities():
             }
             messages.put(data)
             # logger.info("Market Snapshot:\n%s", json.dumps(data))
-        stop_event.wait(0.1)  # 每5秒更新一次
-
-# 启动定期保存和打印线程
-log_and_save_thread = threading.Thread(target=log_and_save_fill_probabilities)
-log_and_save_thread.start()
-
-
-async def ws_inited(ws, path):
-    logger.info("客户端已连接上,ws://localhost:6789初始化完毕")
-    while True:
-        message = await asyncio.get_running_loop().run_in_executor(None, messages.get)
-
-        # 检查连接是否仍然开放
-        if ws.open:
-            message_data = json.dumps(message)  # 将字典序列化为JSON字符串
-            await ws.send(message_data)
-        else:
-            logger.info("WebSocket 连接已关闭")
-            break
-
-async def start_websocket_server():
-    server = await websockets.serve(ws_inited, "localhost", 6789)
-    logger.info("WebSocket 服务器启动在 ws://localhost:6789")
-    await server.wait_closed()
-
-def stop_all_threads():
-    stop_event.set()
-    trade_thread.join()
-    depth_thread.join()
-    log_and_save_thread.join()
-
-# 启动 WebSocket 服务器
-asyncio.run(start_websocket_server())
-
-# 停止所有线程(在需要停止时调用)
-# stop_all_threads()
+        stop_event.wait(0.1)

+ 20 - 0
binance_order_flow/logger_config.py

@@ -0,0 +1,20 @@
+import logging
+import colorlog
+
+# 配置日志
+handler = colorlog.StreamHandler()
+handler.setFormatter(colorlog.ColoredFormatter(
+    "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
+    datefmt=None,
+    reset=True,
+    log_colors={
+        'DEBUG': 'cyan',
+        'INFO': 'blue',
+        'WARNING': 'yellow',
+        'ERROR': 'red',
+        'CRITICAL': 'bold_red',
+    }
+))
+logger = logging.getLogger("market_monitor")
+logger.setLevel(logging.INFO)
+logger.addHandler(handler)

+ 18 - 0
binance_order_flow/main.py

@@ -0,0 +1,18 @@
+import threading
+import asyncio
+from ws_client import start_ws_clients, stop_all_threads
+from ws_server import start_websocket_server
+from data_processing import log_and_save_fill_probabilities
+
+# 启动 WebSocket 客户端
+start_ws_clients()
+
+# 启动定期保存和打印线程
+log_and_save_thread = threading.Thread(target=log_and_save_fill_probabilities)
+log_and_save_thread.start()
+
+# 启动 WebSocket 服务器
+asyncio.run(start_websocket_server())
+
+# 停止所有线程(在需要停止时调用)
+# stop_all_threads()

+ 50 - 0
binance_order_flow/ws_client.py

@@ -0,0 +1,50 @@
+import websocket
+import threading
+
+from binance_order_flow.logger_config import logger
+from data_processing import on_message_trade, on_message_depth, stop_event
+
+# Binance WebSocket API URL
+SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
+SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
+
+
+def on_error(_ws, error):
+    logger.error(error)
+
+
+def on_open(_ws):
+    logger.info("### binance_ws opened ###")
+
+
+# Create a WebSocket app
+ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
+ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open)
+
+# 定义要传递给 run_forever 的参数
+http_proxy_host = "127.0.0.1"
+http_proxy_port = 7890
+proxy_type = "http"
+
+# Run the WebSocket with proxy settings
+trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
+    'http_proxy_host': http_proxy_host,
+    'http_proxy_port': http_proxy_port,
+    'proxy_type': proxy_type
+})
+depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
+    'http_proxy_host': http_proxy_host,
+    'http_proxy_port': http_proxy_port,
+    'proxy_type': proxy_type
+})
+
+
+def start_ws_clients():
+    trade_thread.start()
+    depth_thread.start()
+
+
+def stop_all_threads():
+    stop_event.set()
+    trade_thread.join()
+    depth_thread.join()

+ 25 - 0
binance_order_flow/ws_server.py

@@ -0,0 +1,25 @@
+import asyncio
+import json
+import websockets
+from logger_config import logger
+from data_processing import messages
+
+
+async def ws_inited(ws, path):
+    logger.info("客户端已连接上,ws://localhost:6789初始化完毕")
+    while True:
+        message = await asyncio.get_running_loop().run_in_executor(None, messages.get)
+
+        # 检查连接是否仍然开放
+        if ws.open:
+            message_data = json.dumps(message)  # 将字典序列化为JSON字符串
+            await ws.send(message_data)
+        else:
+            logger.info("WebSocket 连接已关闭")
+            break
+
+
+async def start_websocket_server():
+    server = await websockets.serve(ws_inited, "localhost", 6789)
+    logger.info("WebSocket 服务器启动在 ws://localhost:6789")
+    await server.wait_closed()

+ 0 - 35
matplotlib_study.py

@@ -1,35 +0,0 @@
-import pandas as pd
-import numpy as np
-import matplotlib.pyplot as plt
-import seaborn as sns
-from datetime import datetime, timedelta
-
-# 创建示例订单簿数据
-times = [datetime.now() - timedelta(seconds=i) for i in range(10)]
-prices = np.linspace(10000, 10100, 10)
-bid_qtys = np.random.rand(10, 10) * 100  # 随机生成挂单数量
-
-# 创建一个DataFrame来表示订单簿数据
-order_book_df = pd.DataFrame(
-    {
-        'time': np.repeat(times, 10),
-        'price': np.tile(prices, 10),
-        'bid_qty': bid_qtys.flatten()
-    }
-)
-
-# 设置索引为时间和价格
-order_book_df.set_index(['time', 'price'], inplace=True)
-
-# 选择 bid_qty 列来绘制热力图
-heatmap_data = order_book_df['bid_qty'].unstack().T
-
-# 创建热力图
-plt.figure(figsize=(12, 8))
-sns.heatmap(heatmap_data, cmap="Blues", cbar=True, cbar_kws={'label': 'Order Quantity'})
-plt.title('Order Book Heatmap')
-plt.xlabel('Time')
-plt.ylabel('Price')
-plt.xticks(rotation=45)
-plt.tight_layout()
-plt.show()