import json import time import numpy as np import pandas as pd import queue import threading from collections import deque from logger_config import logger # 假设我们有一个数据流,订单簿和成交数据 order_book_snapshots = deque(maxlen=100) # 存储过去100ms的订单簿快照 trade_data = deque(maxlen=100) # 存储过去100ms的成交数据 prediction_window = deque(maxlen=10) # 用于平滑预测结果的滑动窗口 # 数据积累的阈值 DATA_THRESHOLD = 20 messages = queue.Queue() # 创建一个线程安全队列 stop_event = threading.Event() def on_message_trade(_ws, message): global trade_data json_message = json.loads(message) trade = { 'price': float(json_message['data']['p']), 'qty': float(json_message['data']['q']), 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'), 'side': 'sell' if json_message['data']['m'] else 'buy' } trade_data.append(trade) predict_market_direction() def on_message_depth(_ws, message): global order_book_snapshots json_message = json.loads(message) bids = json_message['data']['b'][:10] # Top 10 bids asks = json_message['data']['a'][:10] # Top 10 asks timestamp = pd.to_datetime(json_message['data']['E'], unit='ms') order_book_snapshots.append({ 'bids': bids, 'asks': asks, 'timestamp': timestamp }) predict_market_direction() def predict_market_direction(): global prediction_window if len(trade_data) == 0: return # 统计过去100ms内的买卖交易数量 buy_count = sum(trade['qty'] for trade in trade_data if trade['side'] == 'buy') sell_count = sum(trade['qty'] for trade in trade_data if trade['side'] == 'sell') # 简单的预测逻辑:买单多则预测上涨,卖单多则预测下跌 prediction = 1 if buy_count > sell_count else 0 # 将预测结果添加到滑动窗口中 prediction_window.append(prediction) # 根据滑动窗口中的多数结果确定当前的预测方向 if len(prediction_window) == prediction_window.maxlen: smoothed_prediction = 1 if sum(prediction_window) > len(prediction_window) / 2 else 0 logger.info("Predicted Market Direction (smoothed): %s", "Up" if smoothed_prediction == 1 else "Down") show_message(smoothed_prediction) def show_message(market_direction): global order_book_snapshots, trade_data if len(order_book_snapshots) > 0 and len(trade_data) > 0: # 获取最新的订单簿数据和成交数据 latest_order_book = order_book_snapshots[-1] latest_trade = trade_data[-1] # 提取asks和bids数据 asks = [[float(price), 0] for price, qty in latest_order_book['asks']] bids = [[float(price), 0] for price, qty in latest_order_book['bids']] # 排序asks和bids数据 asks_sorted = sorted(asks, key=lambda x: x[0]) asks_sorted[-1] = [asks_sorted[-1][0], 1 if market_direction == 1 else 0] bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True) bids_sorted[-1] = [bids_sorted[-1][0], 0 if market_direction == 1 else 1] last_price = latest_trade['price'] # last_qty = latest_trade['qty'] last_qty = 0 side = latest_trade['side'] # 生成数据字典 data = { "asks": asks_sorted, "bids": bids_sorted, "last_price": last_price, "last_qty": last_qty, "side": side, "time": int(time.time() * 1000) } # 将数据放入消息队列 messages.put(data)