| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- import json
- import threading
- from websocket import create_connection, WebSocketConnectionClosedException
- from web3.providers.base import JSONBaseProvider
- from web3.exceptions import ProviderConnectionError
- # 创建一个线程局部存储,确保每个线程有自己的 WebSocket 连接
- thread_local_storage = threading.local()
- class SyncWebSocketProvider(JSONBaseProvider):
- """
- A custom synchronous WebSocket provider for web3.py v6+.
- It uses the `websocket-client` library to provide a synchronous interface.
- """
- def __init__(self, endpoint_uri: str, timeout: int = 10):
- self.endpoint_uri = endpoint_uri
- self.timeout = timeout
- super().__init__()
- def get_local_connection(self):
- # 检查当前线程是否有连接,没有则创建
- if not hasattr(thread_local_storage, 'ws_connection'):
- try:
- thread_local_storage.ws_connection = create_connection(
- self.endpoint_uri,
- timeout=self.timeout
- )
- except Exception as e:
- raise ProviderConnectionError(f"Could not connect to {self.endpoint_uri}: {e}")
- return thread_local_storage.ws_connection
- def make_request(self, method: str, params: list) -> dict:
- request_data = self.encode_rpc_request(method, params)
-
- try:
- conn = self.get_local_connection()
- conn.send(request_data)
- response_raw = conn.recv()
-
- # 如果连接被意外关闭,recv会抛出异常或返回空
- if not response_raw:
- # 清除旧连接并重试一次
- del thread_local_storage.ws_connection
- conn = self.get_local_connection()
- conn.send(request_data)
- response_raw = conn.recv()
- except WebSocketConnectionClosedException:
- # 连接已关闭,尝试重新连接并重试一次
- del thread_local_storage.ws_connection
- conn = self.get_local_connection()
- conn.send(request_data)
- response_raw = conn.recv()
-
- except Exception as e:
- raise ProviderConnectionError(f"Error making request to {self.endpoint_uri}: {e}")
-
- response = self.decode_rpc_response(response_raw.encode('utf-8'))
- return response
|