| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import json
- import time
- import numpy as np
- import pandas as pd
- import queue
- import threading
- from collections import deque
- from logger_config import logger
- # 假设我们有一个数据流,订单簿和成交数据
- order_book_snapshots = deque(maxlen=100) # 存储过去100ms的订单簿快照
- trade_data = deque(maxlen=100) # 存储过去100ms的成交数据
- prediction_window = deque(maxlen=10) # 用于平滑预测结果的滑动窗口
- # 数据积累的阈值
- DATA_THRESHOLD = 20
- messages = queue.Queue() # 创建一个线程安全队列
- stop_event = threading.Event()
- def on_message_trade(_ws, message):
- global trade_data
- json_message = json.loads(message)
- trade = {
- 'price': float(json_message['data']['p']),
- 'qty': float(json_message['data']['q']),
- 'timestamp': pd.to_datetime(json_message['data']['T'], unit='ms'),
- 'side': 'sell' if json_message['data']['m'] else 'buy'
- }
- trade_data.append(trade)
- predict_market_direction()
- def on_message_depth(_ws, message):
- global order_book_snapshots
- json_message = json.loads(message)
- bids = json_message['data']['b'][:10] # Top 10 bids
- asks = json_message['data']['a'][:10] # Top 10 asks
- timestamp = pd.to_datetime(json_message['data']['E'], unit='ms')
- order_book_snapshots.append({
- 'bids': bids,
- 'asks': asks,
- 'timestamp': timestamp
- })
- predict_market_direction()
- def predict_market_direction():
- global prediction_window
- if len(trade_data) == 0:
- return
- # 统计过去100ms内的买卖交易数量
- buy_count = sum(trade['qty'] for trade in trade_data if trade['side'] == 'buy')
- sell_count = sum(trade['qty'] for trade in trade_data if trade['side'] == 'sell')
- # 简单的预测逻辑:买单多则预测上涨,卖单多则预测下跌
- prediction = 1 if buy_count > sell_count else 0
- # 将预测结果添加到滑动窗口中
- prediction_window.append(prediction)
- # 根据滑动窗口中的多数结果确定当前的预测方向
- if len(prediction_window) == prediction_window.maxlen:
- smoothed_prediction = 1 if sum(prediction_window) > len(prediction_window) / 2 else 0
- logger.info("Predicted Market Direction (smoothed): %s", "Up" if smoothed_prediction == 1 else "Down")
- show_message(smoothed_prediction)
- def show_message(market_direction):
- global order_book_snapshots, trade_data
- if len(order_book_snapshots) > 0 and len(trade_data) > 0:
- # 获取最新的订单簿数据和成交数据
- latest_order_book = order_book_snapshots[-1]
- latest_trade = trade_data[-1]
- # 提取asks和bids数据
- asks = [[float(price), 0] for price, qty in latest_order_book['asks']]
- bids = [[float(price), 0] for price, qty in latest_order_book['bids']]
- # 排序asks和bids数据
- asks_sorted = sorted(asks, key=lambda x: x[0])
- asks_sorted[-1] = [asks_sorted[-1][0], 1 if market_direction == 1 else 0]
- bids_sorted = sorted(bids, key=lambda x: x[0], reverse=True)
- bids_sorted[-1] = [bids_sorted[-1][0], 0 if market_direction == 1 else 1]
- last_price = latest_trade['price']
- # last_qty = latest_trade['qty']
- last_qty = 0
- side = latest_trade['side']
- # 生成数据字典
- data = {
- "asks": asks_sorted,
- "bids": bids_sorted,
- "last_price": last_price,
- "last_qty": last_qty,
- "side": side,
- "time": int(time.time() * 1000)
- }
- # 将数据放入消息队列
- messages.put(data)
|