Ver Fonte

log(λ(δ))可以计算了,使用的trades数据。

skyffire há 1 ano atrás
pai
commit
d6fe565c28
4 ficheiros alterados com 44 adições e 51 exclusões
  1. 32 20
      kappa/data_processing.py
  2. 0 4
      kappa/main.py
  3. 12 2
      kappa/ws_client.py
  4. 0 25
      kappa/ws_server.py

+ 32 - 20
kappa/data_processing.py

@@ -8,11 +8,8 @@ import numpy as np
 from logger_config import logger
 
 # 假设我们有一个数据流,订单簿和成交数据
-order_book_snapshots = deque(maxlen=600)  # 存储过去600个订单簿快照
-
-# 数据积累的阈值
-DATA_THRESHOLD = 20
-messages = queue.Queue()  # 创建一个线程安全队列
+order_book_snapshots = deque(maxlen=600)        # 存储过去600个订单簿快照
+trade_snapshots = deque(maxlen=6000)            # 存储过去6000个成交数据
 
 stop_event = threading.Event()
 
@@ -22,7 +19,19 @@ A_initial = 1.0
 
 # 假设S0是初始的参考价格
 S0 = -1
-sigma = 0.2  # 假设一个波动率
+
+
+def on_message_trade(_ws, message):
+    global trade_snapshots
+    json_message = json.loads(message)
+    trade = {
+        'price': float(json_message['data']['p']),
+        'qty': float(json_message['data']['q']),
+        'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
+        'side': 'sell' if json_message['data']['m'] else 'buy'
+    }
+    trade_snapshots.append(trade)
+    process_depth_data()
 
 
 def on_message_depth(_ws, message):
@@ -36,9 +45,7 @@ def on_message_depth(_ws, message):
         'asks': asks,
         'timestamp': timestamp
     })
-
-    if len(order_book_snapshots) >= DATA_THRESHOLD:
-        process_depth_data(order_book_snapshots)
+    process_depth_data()
 
 
 def calculate_phi(prices, k, S0):
@@ -84,14 +91,16 @@ def estimate_lambda(waiting_times, T):
     waiting_times = np.array(waiting_times)
 
     sum_indicator = np.sum(waiting_times < T)
-    sum_waiting_times = np.sum(waiting_times)
+    sum_waiting_times = int(np.sum(waiting_times).total_seconds() * 1000)
     lambda_hat = sum_indicator / sum_waiting_times
     return lambda_hat
 
 
-def process_depth_data(order_book_snapshots):
-    # 数据预热,至少10条数据才能用于计算
-    if len(order_book_snapshots) < 10:
+def process_depth_data():
+    global order_book_snapshots, trade_snapshots
+
+    # 数据预热,至少10条深度数据以及100条成交数据才能用于计算
+    if len(order_book_snapshots) < 10 and len(trade_snapshots) < 100:
         return
 
     global k_initial, A_initial, S0
@@ -100,23 +109,26 @@ def process_depth_data(order_book_snapshots):
     if S0 < 0:
         S0 = S_values[0]
 
+    # ========================= 计算 log(∫ φ(k, ξ) dξ) ==================
     # 提取时间戳并计算时间间隔
-    timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
-    order_book_time_points = [(timestamp - timestamps[0]).total_seconds() for timestamp in timestamps]
+    order_book_timestamps = [snapshot['timestamp'] for snapshot in order_book_snapshots]
+    order_book_time_points = [(timestamp - order_book_timestamps[0]).total_seconds() for timestamp in order_book_timestamps]
 
     # 计算 ∫ φ(k, ξ) dξ
     integral_phi_value = calculate_integral_phi(S_values, k_initial, S0, order_book_time_points)
 
     # 计算 log(∫ φ(k, ξ) dξ)
     log_integral_phi_value = np.log(integral_phi_value)
-    logger.info("log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
 
+    # ========================== 估计 λ(δ) ==============================
     # 计算等待时间
-    waiting_times = [order_book_time_points[i] - order_book_time_points[i - 1] for i in range(1, len(order_book_time_points))]
-    T = 0.25  # 时间窗口的大小
+    trade_timestamps = [snapshot['timestamp'] for snapshot in trade_snapshots]
+    waiting_times = [trade_timestamps[i] - trade_timestamps[i - 1] for i in range(1, len(trade_timestamps))]
+    # 时间窗口的大小
+    T = pd.to_datetime(100, unit='ms') - pd.to_datetime(0, unit='ms')
 
-    # 估计 λ(δ)
     lambda_hat = estimate_lambda(waiting_times, T)
+    # logger.info("λ(δ) 的值: " + str(lambda_hat) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
     log_lambda_hat_value = np.log(lambda_hat)
-    logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value))
+    logger.info("log(λ(δ)) 的值: " + str(log_lambda_hat_value) + "log(∫ φ(k, ξ) dξ) 的值: " + str(log_integral_phi_value))
 

+ 0 - 4
kappa/main.py

@@ -1,6 +1,5 @@
 import asyncio
 from ws_client import start_ws_clients
-from ws_server import start_websocket_server
 import warnings
 
 # 忽略 FutureWarning
@@ -9,8 +8,5 @@ warnings.simplefilter(action='ignore', category=FutureWarning)
 # 启动 WebSocket 客户端
 start_ws_clients()
 
-# 启动 WebSocket 服务器
-asyncio.run(start_websocket_server())
-
 # 停止所有线程(在需要停止时调用)
 # stop_all_threads()

+ 12 - 2
kappa/ws_client.py

@@ -4,10 +4,11 @@ import websocket
 import threading
 
 from logger_config import logger
-from data_processing import on_message_depth, stop_event
+from data_processing import on_message_depth, on_message_trade, stop_event
 
 # Binance WebSocket API URL
-SYMBOL = "not" + "usdt"
+SYMBOL = "1000pepe" + "usdt"
+SOCKET_TRADE = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@trade"
 SOCKET_DEPTH = "wss://fstream.binance.com/stream?streams=" + SYMBOL + "@depth20@100ms"
 
 
@@ -21,6 +22,7 @@ def on_open(_ws):
 
 
 # Create a WebSocket app
+ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error, on_open=on_open)
 ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error, on_open=on_open)
 
 # 定义要传递给 run_forever 的参数
@@ -28,6 +30,12 @@ http_proxy_host = "127.0.0.1"
 http_proxy_port = 7890
 proxy_type = "http"
 
+# Run the WebSocket with proxy settings
+trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
+    'http_proxy_host': http_proxy_host,
+    'http_proxy_port': http_proxy_port,
+    'proxy_type': proxy_type
+})
 depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
     'http_proxy_host': http_proxy_host,
     'http_proxy_port': http_proxy_port,
@@ -36,9 +44,11 @@ depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
 
 
 def start_ws_clients():
+    trade_thread.start()
     depth_thread.start()
 
 
 def stop_all_threads():
     stop_event.set()
+    trade_thread.join()
     depth_thread.join()

+ 0 - 25
kappa/ws_server.py

@@ -1,25 +0,0 @@
-import asyncio
-import json
-import websockets
-from logger_config import logger
-from data_processing import messages
-
-
-async def ws_inited(ws, path):
-    logger.info("客户端已连接上,ws://localhost:6789初始化完毕")
-    while True:
-        message = await asyncio.get_running_loop().run_in_executor(None, messages.get)
-
-        # 检查连接是否仍然开放
-        if ws.open:
-            message_data = json.dumps(message)  # 将字典序列化为JSON字符串
-            await ws.send(message_data)
-        else:
-            logger.info("WebSocket 连接已关闭")
-            break
-
-
-async def start_websocket_server():
-    server = await websockets.serve(ws_inited, "localhost", 6789)
-    logger.info("WebSocket 服务器启动在 ws://localhost:6789")
-    await server.wait_closed()