| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import zmq
- import logging
- from eth_utils import keccak
- import rlp
- import time
- import datetime
- import traceback
- def printTime(*args):
- print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
- *("%.6f" % a if isinstance(a, float) else a
- for a in args))
- def pushText(str1, name):
- name= str(name)
- str1 = str(str1)
- file = open('./data/' + name + '.txt' , 'w')
- file.write(str1)
- file.close()
- def decodeRlpList(data):
- result = []
- for i in range(0, len(data)):
- dataI = data[i]
- if type(dataI) == list:
- result.append(decodeRlpList(dataI))
- elif dataI == b'':
- result.append('')
- elif dataI == []:
- result.append([])
- else:
- value = dataI.hex()
- if value[:2] == '02' and len(value)>=64:
- raw = bytes.fromhex(value[2:])
- try:
- rlpList = (rlp.decode(raw))
- result.append(decodeRlpList(rlpList))
- except:
- result.append([])
- else:
- if len(value) < 39:
- value = int(value, 16)
- else:
- value = '0x' + value
- result.append(value)
- return result
- context = zmq.Context()
- socket = context.socket(zmq.SUB)
- socket.subscribe('ethereum')
- socket.connect("tcp://127.0.0.1:5502")
- initBlock = 0
- payloadLIst = []
- hsList = {}
- while True:
- # 阻塞式调用
- # 目前看有断线重连
- # 分三段接收所有消息
- message_head = socket.recv() # 这个没什么用,主要是订阅消息
- code = socket.recv() # 这个是message code
- payload = socket.recv()
- # 消息主体
- if payload in payloadLIst:
- continue
-
- payloadLIst.append(payload)
- try:
- if code == b'1':
- block = int(payload[-6:].decode(),16)
- if initBlock != block:
- initBlock = block
- printTime('1: ' +str(initBlock))
- elif code == b'8':
- hs = '0x'+payload.decode()[4:]
-
- # if hs not in hsList:
- # hsList[hs] = {}
- # hsList[hs]['8'] = time.time()
- # printTime('8: ' + hs)
- # else:
- # hsList[hs]['8'] = time.time() - list(hsList[hs].items())[0][1]
- # if len(hsList[hs]) >=3:
- # printTime(hs, hsList[hs])
- elif code == b'2':
- raw = payload.decode()
- # raw = bytes.fromhex(raw)
- # rlpList = (rlp.decode(raw))
- # if len(rlpList[0]) > 20 :
- # hs = keccak(rlpList[0]).hex()
- # else:
- # newRlp = rlp.encode(rlpList[0])
- # hs = keccak(newRlp).hex()
-
- # hs = '0x' + hs
-
- # if hs not in hsList:
- # hsList[hs] = {}
- # hsList[hs]['2'] = time.time()
- # printTime('2: ' + hs)
- # else:
- # hsList[hs]['2'] = time.time() - list(hsList[hs].items())[0][1]
- # if len(hsList[hs]) >=3:
- # printTime(hs, hsList[hs])
- elif code == b'10':
- raw = payload.decode()
- # raw = bytes.fromhex(raw)
- # rlpList = (rlp.decode(raw))
- # if len(rlpList[1][0]) > 20 :
- # hs = keccak(rlpList[1][0]).hex()
- # else:
- # newRlp = rlp.encode(rlpList[1][0])
- # hs = keccak(newRlp).hex()
- # hs = '0x' + hs
-
- # if hs not in hsList:
- # hsList[hs] = {}
- # hsList[hs]['10'] = time.time()
- # printTime('10: ' + hs)
- # else:
- # hsList[hs]['10'] = time.time() - list(hsList[hs].items())[0][1]
- # if len(hsList[hs]) >=3:
- # printTime(hs, hsList[hs])
-
- elif code == b'7':
- raw = payload.decode()
- raw = bytes.fromhex(raw)
- rlpList = (rlp.decode(raw))
- blockData = decodeRlpList(rlpList)
- blcokNumber = (blockData[0][0][-8])
- transactionAmount = len(blockData[0][1])
- printTime('7: ', (blcokNumber), 'amount', transactionAmount)
- elif code == b'16':
- raw = payload.decode()
- raw = bytes.fromhex(raw)
- rlpList = (rlp.decode(raw))
- blockData = decodeRlpList(rlpList)
- printTime(code, 'amount', len(blockData[1][0]))
- pushText(blockData, 7)
- except BaseException as err:
- print(traceback.format_exc())
- print(err)
- printTime('ERR', code, payload)
|