import json import time import numpy as np import pandas as pd import queue import threading from collections import deque from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report from sklearn.utils import resample from logger_config import logger # 假设我们有一个数据流,订单簿和成交数据 order_book_snapshots = deque(maxlen=100) # 存储过去100ms的订单簿快照 trade_data = deque(maxlen=100) # 存储过去100ms的成交数据 # 数据积累的阈值 DATA_THRESHOLD = 20 model = RandomForestClassifier(n_estimators=100) # 使用随机森林模型 model_trained = False # 标记模型是否已经被训练 messages = queue.Queue() # 创建一个线程安全队列 stop_event = threading.Event() def on_message_trade(_ws, message): global trade_data json_message = json.loads(message) trade = { 'price': float(json_message['data']['p']), 'qty': float(json_message['data']['q']), 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'), 'side': 'sell' if json_message['data']['m'] else 'buy' } trade_data.append(trade) check_and_train_model() predict_market_direction() def on_message_depth(_ws, message): global order_book_snapshots json_message = json.loads(message) bids = json_message['data']['b'][:10] # Top 10 bids asks = json_message['data']['a'][:10] # Top 10 asks timestamp = pd.to_datetime(json_message['data']['E'], unit='ms') order_book_snapshots.append({ 'bids': bids, 'asks': asks, 'timestamp': timestamp }) check_and_train_model() predict_market_direction() def extract_features(order_book, trade): # 计算买卖盘差距(spread) best_bid = float(order_book['bids'][0][0]) best_ask = float(order_book['asks'][0][0]) spread = best_ask - best_bid # 计算买卖盘深度 bid_depth = sum(float(bid[1]) for bid in order_book['bids']) ask_depth = sum(float(ask[1]) for ask in order_book['asks']) # 计算成交量和方向 trade_volume = trade['qty'] trade_side = 1 if trade['side'] == 'buy' else -1 # 计算买卖盘数量 bid_count = len(order_book['bids']) ask_count = len(order_book['asks']) # 计算时间特征 timestamp = trade['timestamp'].timestamp() features = { 'spread': spread, 'bid_depth': bid_depth, 'ask_depth': ask_depth, 'trade_volume': trade_volume, 'trade_side': trade_side, 'bid_count': bid_count, 'ask_count': ask_count, 'timestamp': timestamp } return features def prepare_training_data(): # 提取特征和标签 X = [] y = [] for i in range(len(order_book_snapshots) - 1): if i + 1 >= len(order_book_snapshots) or i >= len(trade_data): break current_order_book = order_book_snapshots[i] current_trade = trade_data[i] future_order_book = order_book_snapshots[i + 1] # 提取当前的特征 features = extract_features(current_order_book, current_trade) X.append(list(features.values())) # 生成标签 current_price = float(current_order_book['bids'][0][0]) future_price = float(future_order_book['bids'][0][0]) label = generate_label(current_price, future_price) y.append(label) # 将特征和标签转换为NumPy数组 X_train = np.array(X) y_train = np.array(y) return X_train, y_train def generate_label(current_price, future_price): return 1 if future_price > current_price else 0 def balance_data(X, y): # 将数据转换为 DataFrame,便于处理 df = pd.DataFrame(X, columns=['spread', 'bid_depth', 'ask_depth', 'trade_volume', 'trade_side', 'bid_count', 'ask_count', 'timestamp']) df['label'] = y # 分别获取两类数据 df_majority = df[df.label == 0] df_minority = df[df.label == 1] if len(df_minority) == 0 or len(df_majority) == 0: return X, y # 如果某一类数据为空,返回原数据 # 使用下采样或上采样来平衡数据 if len(df_majority) > len(df_minority): df_majority_downsampled = resample(df_majority, replace=False, n_samples=len(df_minority), random_state=42) df_balanced = pd.concat([df_majority_downsampled, df_minority]) else: df_minority_upsampled = resample(df_minority, replace=True, n_samples=len(df_majority), random_state=42) df_balanced = pd.concat([df_majority, df_minority_upsampled]) # 提取平衡后的特征和标签 X_balanced = df_balanced.drop('label', axis=1).values y_balanced = df_balanced['label'].values return X_balanced, y_balanced def check_and_train_model(): global model_trained if len(order_book_snapshots) >= DATA_THRESHOLD and len(trade_data) >= DATA_THRESHOLD: X_train, y_train = prepare_training_data() if len(X_train) > 0 and len(y_train) > 0: X_train, y_train = balance_data(X_train, y_train) # 平衡数据 X_train, X_test, y_train, y_test = train_test_split(X_train, y_train, test_size=0.2, random_state=42) model.fit(X_train, y_train) model_trained = True # 标记模型已经被训练 logger.info("Model trained with %d samples", len(X_train)) y_pred = model.predict(X_test) logger.info("\n" + classification_report(y_test, y_pred, zero_division=0)) else: logger.info("Insufficient data to train the model") else: logger.info("Not enough data to train the model: %d order book snapshots, %d trades", len(order_book_snapshots), len(trade_data)) def predict_market_direction(): global model_trained if len(order_book_snapshots) == 0 or len(trade_data) == 0: return if not model_trained: return features = extract_features(order_book_snapshots[-1], trade_data[-1]) if features is not None: feature_vector = np.array([list(features.values())]) prediction = model.predict(feature_vector) logger.info("Predicted Market Direction: %s", "Up" if prediction[0] == 1 else "Down") show_message(prediction[0]) def show_message(market_direction): global order_book_snapshots, trade_data if len(order_book_snapshots) > 0 and len(trade_data) > 0: # 获取最新的订单簿数据和成交数据 latest_order_book = order_book_snapshots[-1] latest_trade = trade_data[-1] # 提取asks和bids数据 asks = [[float(price), 1 if market_direction == 1 else 0] for price, qty in latest_order_book['asks']] bids = [[float(price), 1 if market_direction == 0 else 0] for price, qty in latest_order_book['bids']] # 排序asks和bids数据 asks_sorted = sorted(asks, key=lambda x: x[0]) bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True) last_price = latest_trade['price'] # last_qty = latest_trade['qty'] last_qty = 0 side = latest_trade['side'] # 生成数据字典 data = { "asks": asks_sorted, "bids": bids_sorted, "last_price": last_price, "last_qty": last_qty, "side": side, "time": int(time.time() * 1000) } # 将数据放入消息队列 messages.put(data)