| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import threading
- import websocket
- import json
- import pandas as pd
- import numpy as np
- from scipy.optimize import minimize
- import time
- import warnings
- import logging
- import colorlog
- # 忽略 FutureWarning
- warnings.simplefilter(action='ignore', category=FutureWarning)
- # 配置日志
- handler = colorlog.StreamHandler()
- handler.setFormatter(colorlog.ColoredFormatter(
- "%(log_color)s%(asctime)s - %(name)s - %(levelname)s \n %(message)s",
- datefmt=None,
- reset=True,
- log_colors={
- 'DEBUG': 'cyan',
- 'INFO': 'blue',
- 'WARNING': 'yellow',
- 'ERROR': 'red',
- 'CRITICAL': 'bold_red',
- }
- ))
- logger = logging.getLogger("binance_gp_demo")
- logger.setLevel(logging.INFO)
- logger.addHandler(handler)
- # 步骤二:订阅Binance的成交数据和订单簿数据
- # Binance WebSocket API URL
- SOCKET_TRADE = "wss://stream.binance.com:9443/ws/btcusdt@trade"
- SOCKET_DEPTH = "wss://stream.binance.com:9443/ws/btcusdt@depth20@100ms"
- # Initialize the DataFrame
- df_trades = pd.DataFrame(columns=['price', 'qty', 'timestamp'])
- df_order_book = pd.DataFrame(columns=['bid_price', 'bid_qty', 'ask_price', 'ask_qty'])
- def on_message_trade(_ws, message):
- global df_trades
- json_message = json.loads(message)
- trade = {
- 'price': float(json_message['p']),
- 'qty': float(json_message['q']),
- 'timestamp': pd.to_datetime(json_message['T'], unit='ms')
- }
- trade_df = pd.DataFrame([trade])
- if not trade_df.empty and not trade_df.isna().all().all():
- df_trades = pd.concat([df_trades, trade_df], ignore_index=True)
- # Function to handle order book messages
- def on_message_depth(_ws, message):
- global df_order_book
- json_message = json.loads(message)
- bids = json_message['bids'][:10] # Top 10 bids
- asks = json_message['asks'][:10] # Top 10 asks
- order_book = {
- 'bid_price': [float(bid[0]) for bid in bids],
- 'bid_qty': [float(bid[1]) for bid in bids],
- 'ask_price': [float(ask[0]) for ask in asks],
- 'ask_qty': [float(ask[1]) for ask in asks]
- }
- df_order_book = pd.DataFrame([order_book])
- def on_error(_ws, error):
- logger.error(error)
- def on_open(_ws):
- print("### opened ###")
- # Create a WebSocket app
- ws_trade = websocket.WebSocketApp(SOCKET_TRADE, on_message=on_message_trade, on_error=on_error)
- ws_depth = websocket.WebSocketApp(SOCKET_DEPTH, on_message=on_message_depth, on_error=on_error)
- # 定义要传递给 run_forever 的参数
- http_proxy_host = "127.0.0.1"
- http_proxy_port = 7890
- proxy_type = "http"
- # Run the WebSocket with proxy settings
- trade_thread = threading.Thread(target=ws_trade.run_forever, kwargs={
- 'http_proxy_host': http_proxy_host,
- 'http_proxy_port': http_proxy_port,
- 'proxy_type': proxy_type
- })
- depth_thread = threading.Thread(target=ws_depth.run_forever, kwargs={
- 'http_proxy_host': http_proxy_host,
- 'http_proxy_port': http_proxy_port,
- 'proxy_type': proxy_type
- })
- trade_thread.start()
- depth_thread.start()
- # 步骤三:价差随机过程建模
- # Define the states and transition matrix
- states = ['tight', 'normal', 'wide']
- transition_matrix = np.zeros((3, 3))
- # 定义函数来更新转移矩阵
- def update_transition_matrix(df):
- global transition_matrix
- for i in range(len(df) - 1):
- current_state = df['state'].iloc[i]
- next_state = df['state'].iloc[i + 1]
- transition_matrix[states.index(current_state), states.index(next_state)] += 1
- row_sums = transition_matrix.sum(axis=1, keepdims=True)
- row_sums[row_sums == 0] = 1
- transition_matrix = transition_matrix / row_sums
- # 定义函数来分类价差状态
- def classify_spread(spread):
- if spread < 0.01:
- return 'tight'
- elif spread < 0.02:
- return 'normal'
- else:
- return 'wide'
- # 定义函数来计算价差并进行分类
- def calculate_and_classify_spread():
- global df_trades
- df_trades['spread'] = df_trades['price'].diff().abs()
- df_trades['state'] = df_trades['spread'].apply(classify_spread)
- # 定义周期性更新转移矩阵的函数
- stop_event = threading.Event()
- def update_transition_matrix_periodically():
- while not stop_event.is_set():
- calculate_and_classify_spread()
- update_transition_matrix(df_trades)
- current_state = df_trades['state'].iloc[-1] if not df_trades.empty else 'unknown'
- logger.info("Current State: %s\nTransition Matrix:\n%s\n", current_state, transition_matrix)
- stop_event.wait(5) # 每5秒更新一次
- # 创建并启动线程
- transition_matrix_update_thread = threading.Thread(target=update_transition_matrix_periodically)
- transition_matrix_update_thread.start()
|