skyffire 1 жил өмнө
parent
commit
cc6e32d5df

+ 0 - 104
binance_order_flow/data_processing.py

@@ -1,104 +0,0 @@
-import json
-import time
-import numpy as np
-import pandas as pd
-import queue
-import threading
-from collections import deque
-from logger_config import logger
-
-# 假设我们有一个数据流,订单簿和成交数据
-order_book_snapshots = deque(maxlen=100)  # 存储过去100ms的订单簿快照
-trade_data = deque(maxlen=100)  # 存储过去100ms的成交数据
-prediction_window = deque(maxlen=10)  # 用于平滑预测结果的滑动窗口
-
-# 数据积累的阈值
-DATA_THRESHOLD = 20
-messages = queue.Queue()  # 创建一个线程安全队列
-
-stop_event = threading.Event()
-
-
-def on_message_trade(_ws, message):
-    global trade_data
-    json_message = json.loads(message)
-    trade = {
-        'price': float(json_message['data']['p']),
-        'qty': float(json_message['data']['q']),
-        'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
-        'side': 'sell' if json_message['data']['m'] else 'buy'
-    }
-    trade_data.append(trade)
-    predict_market_direction()
-
-
-def on_message_depth(_ws, message):
-    global order_book_snapshots
-    json_message = json.loads(message)
-    bids = json_message['data']['b'][:10]  # Top 10 bids
-    asks = json_message['data']['a'][:10]  # Top 10 asks
-    timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
-    order_book_snapshots.append({
-        'bids': bids,
-        'asks': asks,
-        'timestamp': timestamp
-    })
-    predict_market_direction()
-
-
-def predict_market_direction():
-    global prediction_window
-    if len(trade_data) == 0:
-        return
-
-    # 统计过去100ms内的买卖交易数量
-    buy_count = sum(trade['qty'] for trade in trade_data if trade['side'] == 'buy')
-    sell_count = sum(trade['qty'] for trade in trade_data if trade['side'] == 'sell')
-
-    # 简单的预测逻辑:买单多则预测上涨,卖单多则预测下跌
-    prediction = 1 if buy_count > sell_count else 0
-
-    # 将预测结果添加到滑动窗口中
-    prediction_window.append(prediction)
-
-    # 根据滑动窗口中的多数结果确定当前的预测方向
-    if len(prediction_window) == prediction_window.maxlen:
-        smoothed_prediction = 1 if sum(prediction_window) > len(prediction_window) / 2 else 0
-        logger.info("Predicted Market Direction (smoothed): %s", "Up" if smoothed_prediction == 1 else "Down")
-        show_message(smoothed_prediction)
-
-
-def show_message(market_direction):
-    global order_book_snapshots, trade_data
-
-    if len(order_book_snapshots) > 0 and len(trade_data) > 0:
-        # 获取最新的订单簿数据和成交数据
-        latest_order_book = order_book_snapshots[-1]
-        latest_trade = trade_data[-1]
-
-        # 提取asks和bids数据
-        asks = [[float(price), 0] for price, qty in latest_order_book['asks']]
-        bids = [[float(price), 0] for price, qty in latest_order_book['bids']]
-
-        # 排序asks和bids数据
-        asks_sorted = sorted(asks, key=lambda x: x[0])
-        asks_sorted[-1] = [asks_sorted[-1][0], 1 if market_direction == 1 else 0]
-        bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True)
-        bids_sorted[-1] = [bids_sorted[-1][0], 0 if market_direction == 1 else 1]
-
-        last_price = latest_trade['price']
-        last_qty = latest_trade['qty']
-        side = latest_trade['side']
-
-        # 生成数据字典
-        data = {
-            "asks": asks_sorted,
-            "bids": bids_sorted,
-            "last_price": last_price,
-            "last_qty": last_qty,
-            "side": side,
-            "time": int(time.time() * 1000)
-        }
-
-        # 将数据放入消息队列
-        messages.put(data)

+ 98 - 0
kappa/data_processing.py

@@ -0,0 +1,98 @@
+import json
+import scipy.stats as stats
+import numpy as np
+import pandas as pd
+import queue
+import threading
+from collections import deque
+from logger_config import logger
+
+# 假设我们有一个数据流,订单簿和成交数据
+order_book_snapshots = deque(maxlen=100)  # 存储过去100ms的订单簿快照
+
+# 数据积累的阈值
+DATA_THRESHOLD = 20
+messages = queue.Queue()  # 创建一个线程安全队列
+
+stop_event = threading.Event()
+
+
+# 计算kappa的参数
+gamma = 0.1  # 库存风险厌恶参数
+sigma = 0.02  # 市场波动率
+T_minus_t = 1  # 交易会话的剩余时间
+
+# def on_message_trade(_ws, message):
+#     global trade_data
+#     json_message = json.loads(message)
+#     trade = {
+#         'price': float(json_message['data']['p']),
+#         'qty': float(json_message['data']['q']),
+#         'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
+#         'side': 'sell' if json_message['data']['m'] else 'buy'
+#     }
+#     trade_data.append(trade)
+#     predict_market_direction()
+
+
+def on_message_depth(_ws, message):
+    global order_book_snapshots
+    json_message = json.loads(message)
+    bids = json_message['data']['b'][:10]  # Top 10 bids
+    asks = json_message['data']['a'][:10]  # Top 10 asks
+    timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
+    order_book_snapshots.append({
+        'bids': bids,
+        'asks': asks,
+        'timestamp': timestamp
+    })
+    process_depth_data(bids, asks)
+
+
+def process_depth_data(bids, asks):
+    # 提取订单大小数据
+    order_sizes = []
+    for bid in bids:
+        order_sizes.append(float(bid[1]))
+    for ask in asks:
+        order_sizes.append(float(ask[1]))
+
+    if len(order_sizes) < 2:  # 确保有足够的数据进行计算
+        logger.warning("Not enough data to calculate kappa.")
+        return
+
+    # 使用 MLE 估计幂律指数 β
+    Q_min = np.min(order_sizes)
+    n = len(order_sizes)
+    try:
+        beta_hat = 1 + n / np.sum(np.log(order_sizes / Q_min))
+    except ZeroDivisionError:
+        logger.error("ZeroDivisionError encountered in MLE calculation.")
+        return
+
+    # 使用最小二乘法拟合幂律分布
+    log_order_sizes = np.log(order_sizes)
+    hist, bin_edges = np.histogram(log_order_sizes, bins=50, density=True)
+    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
+
+    # 过滤掉频率为零的bin
+    non_zero_indices = hist > 0
+    hist = hist[non_zero_indices]
+    bin_centers = bin_centers[non_zero_indices]
+
+    if len(hist) == 0 or len(bin_centers) == 0:
+        logger.warning("No non-zero bins available for linear regression.")
+        return
+
+    try:
+        slope, intercept, r_value, p_value, std_err = stats.linregress(bin_centers, np.log(hist))
+        beta_lsm = -slope
+    except ValueError as e:
+        logger.error(f"ValueError encountered in linear regression: {e}")
+        return
+
+    # 计算kappa值
+    k = beta_hat  # 这里假设我们使用MLE估计的β值作为k
+    logger.info(f"Estimated kappa: {k}")
+    # kappa = (2 / (gamma * sigma**2 * T_minus_t)) * np.log(1 + (gamma / k))
+    # logger.info(f"Estimated Kappa: {kappa}")

+ 0 - 0
binance_order_flow/logger_config.py → kappa/logger_config.py


+ 0 - 0
binance_order_flow/main.py → kappa/main.py


+ 10 - 10
binance_order_flow/ws_client.py → kappa/ws_client.py

@@ -2,11 +2,11 @@ import websocket
 import threading
 
 from logger_config import logger
-from data_processing import on_message_trade, on_message_depth, stop_event
+from data_processing import on_message_depth, stop_event
 
 # Binance WebSocket API URL
 SYMBOL = "token" + "usdt"
-SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
+# SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
 SOCKET_DEPTH = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@depth20@100ms"
 
 
@@ -19,7 +19,7 @@ def on_open(_ws):
 
 
 # Create a WebSocket app
-ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
+# ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
 ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open)
 
 # 定义要传递给 run_forever 的参数
@@ -28,11 +28,11 @@ http_proxy_port = 7890
 proxy_type = "http"
 
 # Run the WebSocket with proxy settings
-trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
-    'http_proxy_host': http_proxy_host,
-    'http_proxy_port': http_proxy_port,
-    'proxy_type': proxy_type
-})
+# trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
+#     'http_proxy_host': http_proxy_host,
+#     'http_proxy_port': http_proxy_port,
+#     'proxy_type': proxy_type
+# })
 depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
     'http_proxy_host': http_proxy_host,
     'http_proxy_port': http_proxy_port,
@@ -41,11 +41,11 @@ depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
 
 
 def start_ws_clients():
-    trade_thread.start()
+    # trade_thread.start()
     depth_thread.start()
 
 
 def stop_all_threads():
     stop_event.set()
-    trade_thread.join()
+    # trade_thread.join()
     depth_thread.join()

+ 0 - 0
binance_order_flow/ws_server.py → kappa/ws_server.py