import zmq import logging from eth_utils import keccak import rlp import time import datetime import traceback def printTime(*args): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], *("%.6f" % a if isinstance(a, float) else a for a in args)) def pushText(str1, name): name= str(name) str1 = str(str1) file = open('./data/' + name + '.txt' , 'w') file.write(str1) file.close() def decodeRlpList(data): result = [] for i in range(0, len(data)): dataI = data[i] if type(dataI) == list: result.append(decodeRlpList(dataI)) elif dataI == b'': result.append('') elif dataI == []: result.append([]) else: value = dataI.hex() if value[:2] == '02' and len(value)>=64: raw = bytes.fromhex(value[2:]) try: rlpList = (rlp.decode(raw)) result.append(decodeRlpList(rlpList)) except: result.append([]) else: if len(value) < 39: value = int(value, 16) else: value = '0x' + value result.append(value) return result context = zmq.Context() socket = context.socket(zmq.SUB) socket.subscribe('ethereum') socket.connect("tcp://127.0.0.1:5502") initBlock = 0 payloadLIst = [] hsList = {} while True: # 阻塞式调用 # 目前看有断线重连 # 分三段接收所有消息 message_head = socket.recv() # 这个没什么用,主要是订阅消息 code = socket.recv() # 这个是message code payload = socket.recv() # 消息主体 if payload in payloadLIst: continue payloadLIst.append(payload) try: if code == b'1': block = int(payload[-6:].decode(),16) if initBlock != block: initBlock = block printTime('1: ' +str(initBlock)) elif code == b'8': hs = '0x'+payload.decode()[4:] # if hs not in hsList: # hsList[hs] = {} # hsList[hs]['8'] = time.time() # printTime('8: ' + hs) # else: # hsList[hs]['8'] = time.time() - list(hsList[hs].items())[0][1] # if len(hsList[hs]) >=3: # printTime(hs, hsList[hs]) elif code == b'2': raw = payload.decode() # raw = bytes.fromhex(raw) # rlpList = (rlp.decode(raw)) # if len(rlpList[0]) > 20 : # hs = keccak(rlpList[0]).hex() # else: # newRlp = rlp.encode(rlpList[0]) # hs = keccak(newRlp).hex() # hs = '0x' + hs # if hs not in hsList: # hsList[hs] = {} # hsList[hs]['2'] = time.time() # printTime('2: ' + hs) # else: # hsList[hs]['2'] = time.time() - list(hsList[hs].items())[0][1] # if len(hsList[hs]) >=3: # printTime(hs, hsList[hs]) elif code == b'10': raw = payload.decode() # raw = bytes.fromhex(raw) # rlpList = (rlp.decode(raw)) # if len(rlpList[1][0]) > 20 : # hs = keccak(rlpList[1][0]).hex() # else: # newRlp = rlp.encode(rlpList[1][0]) # hs = keccak(newRlp).hex() # hs = '0x' + hs # if hs not in hsList: # hsList[hs] = {} # hsList[hs]['10'] = time.time() # printTime('10: ' + hs) # else: # hsList[hs]['10'] = time.time() - list(hsList[hs].items())[0][1] # if len(hsList[hs]) >=3: # printTime(hs, hsList[hs]) elif code == b'7': raw = payload.decode() raw = bytes.fromhex(raw) rlpList = (rlp.decode(raw)) blockData = decodeRlpList(rlpList) blcokNumber = (blockData[0][0][-8]) transactionAmount = len(blockData[0][1]) printTime('7: ', (blcokNumber), 'amount', transactionAmount) elif code == b'16': raw = payload.decode() raw = bytes.fromhex(raw) rlpList = (rlp.decode(raw)) blockData = decodeRlpList(rlpList) printTime(code, 'amount', len(blockData[1][0])) pushText(blockData, 7) except BaseException as err: print(traceback.format_exc()) print(err) printTime('ERR', code, payload)