Răsfoiți Sursa

机器学习初版

skyffire 1 an în urmă
părinte
comite
3f319fd4ec
1 a modificat fișierele cu 123 adăugiri și 58 ștergeri
  1. 123 58
      binance_order_flow/data_processing.py

+ 123 - 58
binance_order_flow/data_processing.py

@@ -1,85 +1,150 @@
 import json
 
+import numpy as np
 import pandas as pd
 import time
 import queue
 import threading
 from datetime import datetime
 from logger_config import logger
+from collections import deque
+from sklearn.linear_model import LogisticRegression
 
-# Initialize the DataFrame
-df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
-df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
+# 假设我们有一个数据流,订单簿和成交数据
+order_book_snapshots = deque(maxlen=10000)  # 存储过去100ms的订单簿快照
+trade_data = deque(maxlen=10000)  # 存储过去100ms的成交数据
 
-previous_order_book = None
-fill_probabilities = {}
-order_disappearances = {}
-order_executions = {}
-spoofing_probabilities = {}
-last_trade = {'price': None, 'qty': None, 'side': None}
+# 数据积累的阈值
+DATA_THRESHOLD = 100
+model = LogisticRegression()  # 初始化模型
 messages = queue.Queue()  # 创建一个线程安全队列
 
 stop_event = threading.Event()
 
 def on_message_trade(_ws, message):
-    global df_trades, order_executions, last_trade
+    global trade_data
     json_message = json.loads(message)
     trade = {
         'price': float(json_message['data']['p']),
         'qty': float(json_message['data']['q']),
         'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
-        'side': 'sell' if json_message['data']['m'] else 'buy'  # 'm' indicates是否买方是做市商
+        'side': 'sell' if json_message['data']['m'] else 'buy'
     }
-    trade_df = pd.DataFrame([trade])
-    if not trade_df.empty and not trade_df.isna().all().all():
-        df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
-
-        # 记录每个价格的实际成交总量
-        price = trade['price']
-        last_trade = {'price': price, 'qty': trade['qty'], 'side': trade['side']}
-        if price not in order_executions:
-            order_executions[price] = 0
-        order_executions[price] += trade['qty']
-        show_message()
+    trade_data.append(trade)
+    predict_market_direction()
+
 
 def on_message_depth(_ws, message):
-    global df_order_book, order_disappearances, previous_order_book, spoofing_probabilities
+    global order_book_snapshots
     json_message = json.loads(message)
     bids = json_message['data']['b'][:10]  # Top 10 bids
     asks = json_message['data']['a'][:10]  # Top 10 asks
-    order_book = {
-        'bid_price': [float(bid[0]) for bid in bids],
-        'bid_qty': [float(bid[1]) for bid in bids],
-        'ask_price': [float(ask[0]) for ask in asks],
-        'ask_qty': [float(ask[1]) for ask in asks]
+    timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
+    order_book_snapshots.append({
+        'bids': bids,
+        'asks': asks,
+        'timestamp': timestamp
+    })
+    predict_market_direction()
+
+
+def extract_features(order_book, trade):
+    # 计算买卖盘差距(spread)
+    best_bid = float(order_book['bids'][0][0])
+    best_ask = float(order_book['asks'][0][0])
+    spread = best_ask - best_bid
+
+    # 计算买卖盘深度
+    bid_depth = sum(float(bid[1]) for bid in order_book['bids'])
+    ask_depth = sum(float(ask[1]) for ask in order_book['asks'])
+
+    # 计算成交量和方向
+    trade_volume = trade['qty']
+    trade_side = 1 if trade['side'] == 'buy' else -1
+
+    features = {
+        'spread': spread,
+        'bid_depth': bid_depth,
+        'ask_depth': ask_depth,
+        'trade_volume': trade_volume,
+        'trade_side': trade_side
     }
-    df_order_book = pd.DataFrame([order_book])
-
-    show_message()
-
-
-# 计算成交概率
-def show_message():
-    global df_order_book, last_trade
-
-    if not df_order_book.empty and last_trade['price'] is not None:
-        last_price = last_trade['price']
-        asks = [[price, qty] for price, qty in
-                zip(df_order_book['ask_price'].iloc[0], df_order_book['ask_qty'].iloc[0])]
-        bids = [[price, qty] for price, qty in
-                zip(df_order_book['bid_price'].iloc[0], df_order_book['bid_qty'].iloc[0])]
-
-        asks_sorted = sorted(asks, key=lambda x: x[0])
-        bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True)
-
-        last_qty = last_trade['qty']
-        side = last_trade['side']
-        data = {
-            "asks": asks_sorted,
-            "bids": bids_sorted,
-            "last_price": last_price,
-            "last_qty": last_qty,
-            "side": side,
-            "time": int(time.time() * 1000)
-        }
-        messages.put(data)
+
+    return features
+
+
+def prepare_training_data():
+    # 提取特征和标签
+    X = []
+    y = []
+
+    for i in range(len(order_book_snapshots) - 1):
+        current_order_book = order_book_snapshots[i]
+        current_trade = trade_data[i]
+        future_order_book = order_book_snapshots[i + 1]
+
+        # 提取当前的特征
+        features = extract_features(current_order_book, current_trade)
+        X.append(list(features.values()))
+
+        # 生成标签
+        current_price = float(current_order_book['bids'][0][0])
+        future_price = float(future_order_book['bids'][0][0])
+        label = generate_label(current_price, future_price)
+        y.append(label)
+
+    # 将特征和标签转换为NumPy数组
+    X_train = np.array(X)
+    y_train = np.array(y)
+
+    return X_train, y_train
+
+
+def generate_label(current_price, future_price):
+    return 1 if future_price > current_price else 0
+
+
+def check_and_train_model():
+    if len(order_book_snapshots) >= DATA_THRESHOLD and len(trade_data) >= DATA_THRESHOLD:
+        X_train, y_train = prepare_training_data()
+        model.fit(X_train, y_train)
+        logger.info("Model trained with", len(X_train), "samples")
+
+
+def predict_market_direction():
+    if len(order_book_snapshots) == 0 or len(trade_data) == 0:
+        logger.info("Not enough data to make a prediction")
+        return
+
+    features = extract_features(order_book_snapshots[-1], trade_data[-1])
+    if features is not None:
+        feature_vector = np.array([list(features.values())])
+        prediction = model.predict(feature_vector)
+        logger.info("Predicted Market Direction:", "Up" if prediction[0] == 1 else "Down")
+
+
+# # 计算成交概率
+# def show_message():
+#     global df_order_book, last_trade
+#
+#     if not df_order_book.empty and last_trade['price'] is not None:
+#         last_price = last_trade['price']
+#         asks = [[price, qty] for price, qty in
+#                 zip(df_order_book['ask_price'].iloc[0], df_order_book['ask_qty'].iloc[0])]
+#         bids = [[price, qty] for price, qty in
+#                 zip(df_order_book['bid_price'].iloc[0], df_order_book['bid_qty'].iloc[0])]
+#
+#         asks_sorted = sorted(asks, key=lambda x: x[0])
+#         bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True)
+#
+#         last_qty = last_trade['qty']
+#         side = last_trade['side']
+#         data = {
+#             "asks": asks_sorted,
+#             "bids": bids_sorted,
+#             "last_price": last_price,
+#             "last_qty": last_qty,
+#             "side": side,
+#             "time": int(time.time() * 1000)
+#         }
+#         messages.put(data)