protocol.go 10 KB


  1. package eth
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/big"
  6. "github.com/ethereum/go-ethereum/core/types"
  7. "github.com/ethereum/go-ethereum/errs"
  8. "github.com/ethereum/go-ethereum/ethutil"
  9. "github.com/ethereum/go-ethereum/logger"
  10. "github.com/ethereum/go-ethereum/p2p"
  11. "github.com/ethereum/go-ethereum/rlp"
  12. )
  13. const (
  14. ProtocolVersion = 56
  15. NetworkId = 0
  16. ProtocolLength = uint64(8)
  17. ProtocolMaxMsgSize = 10 * 1024 * 1024
  18. maxHashes = 256
  19. maxBlocks = 64
  20. )
  21. // eth protocol message codes
  22. const (
  23. StatusMsg = iota
  24. GetTxMsg // unused
  25. TxMsg
  26. GetBlockHashesMsg
  27. BlockHashesMsg
  28. GetBlocksMsg
  29. BlocksMsg
  30. NewBlockMsg
  31. )
  32. const (
  33. ErrMsgTooLarge = iota
  34. ErrDecode
  35. ErrInvalidMsgCode
  36. ErrProtocolVersionMismatch
  37. ErrNetworkIdMismatch
  38. ErrGenesisBlockMismatch
  39. ErrNoStatusMsg
  40. ErrExtraStatusMsg
  41. )
  42. var errorToString = map[int]string{
  43. ErrMsgTooLarge: "Message too long",
  44. ErrDecode: "Invalid message",
  45. ErrInvalidMsgCode: "Invalid message code",
  46. ErrProtocolVersionMismatch: "Protocol version mismatch",
  47. ErrNetworkIdMismatch: "NetworkId mismatch",
  48. ErrGenesisBlockMismatch: "Genesis block mismatch",
  49. ErrNoStatusMsg: "No status message",
  50. ErrExtraStatusMsg: "Extra status message",
  51. }
  52. // ethProtocol represents the ethereum wire protocol
  53. // instance is running on each peer
  54. type ethProtocol struct {
  55. txPool txPool
  56. chainManager chainManager
  57. blockPool blockPool
  58. peer *p2p.Peer
  59. id string
  60. rw p2p.MsgReadWriter
  61. errors *errs.Errors
  62. }
  63. // backend is the interface the ethereum protocol backend should implement
  64. // used as an argument to EthProtocol
  65. type txPool interface {
  66. AddTransactions([]*types.Transaction)
  67. GetTransactions() types.Transactions
  68. }
  69. type chainManager interface {
  70. GetBlockHashesFromHash(hash []byte, amount uint64) (hashes [][]byte)
  71. GetBlock(hash []byte) (block *types.Block)
  72. Status() (td *big.Int, currentBlock []byte, genesisBlock []byte)
  73. }
  74. type blockPool interface {
  75. AddBlockHashes(next func() ([]byte, bool), peerId string)
  76. AddBlock(block *types.Block, peerId string)
  77. AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(*errs.Error)) (best bool)
  78. RemovePeer(peerId string)
  79. }
  80. // message structs used for rlp decoding
  81. type newBlockMsgData struct {
  82. Block *types.Block
  83. TD *big.Int
  84. }
  85. type getBlockHashesMsgData struct {
  86. Hash []byte
  87. Amount uint64
  88. }
  89. // main entrypoint, wrappers starting a server running the eth protocol
  90. // use this constructor to attach the protocol ("class") to server caps
  91. // the Dev p2p layer then runs the protocol instance on each peer
  92. func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
  93. return p2p.Protocol{
  94. Name: "eth",
  95. Version: ProtocolVersion,
  96. Length: ProtocolLength,
  97. Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
  98. return runEthProtocol(txPool, chainManager, blockPool, peer, rw)
  99. },
  100. }
  101. }
  102. // the main loop that handles incoming messages
  103. // note RemovePeer in the post-disconnect hook
  104. func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
  105. id := peer.ID()
  106. self := &ethProtocol{
  107. txPool: txPool,
  108. chainManager: chainManager,
  109. blockPool: blockPool,
  110. rw: rw,
  111. peer: peer,
  112. errors: &errs.Errors{
  113. Package: "ETH",
  114. Errors: errorToString,
  115. },
  116. id: fmt.Sprintf("%x", id[:8]),
  117. }
  118. err = self.handleStatus()
  119. if err == nil {
  120. self.propagateTxs()
  121. for {
  122. err = self.handle()
  123. if err != nil {
  124. self.blockPool.RemovePeer(self.id)
  125. break
  126. }
  127. }
  128. }
  129. return
  130. }
  131. func (self *ethProtocol) handle() error {
  132. msg, err := self.rw.ReadMsg()
  133. if err != nil {
  134. return err
  135. }
  136. if msg.Size > ProtocolMaxMsgSize {
  137. return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  138. }
  139. // make sure that the payload has been fully consumed
  140. defer msg.Discard()
  141. switch msg.Code {
  142. case GetTxMsg: // ignore
  143. case StatusMsg:
  144. return self.protoError(ErrExtraStatusMsg, "")
  145. case TxMsg:
  146. // TODO: rework using lazy RLP stream
  147. var txs []*types.Transaction
  148. if err := msg.Decode(&txs); err != nil {
  149. return self.protoError(ErrDecode, "msg %v: %v", msg, err)
  150. }
  151. for _, tx := range txs {
  152. jsonlogger.LogJson(&logger.EthTxReceived{
  153. TxHash: ethutil.Bytes2Hex(tx.Hash()),
  154. RemoteId: self.peer.ID().String(),
  155. })
  156. }
  157. self.txPool.AddTransactions(txs)
  158. case GetBlockHashesMsg:
  159. var request getBlockHashesMsgData
  160. if err := msg.Decode(&request); err != nil {
  161. return self.protoError(ErrDecode, "->msg %v: %v", msg, err)
  162. }
  163. if request.Amount > maxHashes {
  164. request.Amount = maxHashes
  165. }
  166. hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount)
  167. return p2p.EncodeMsg(self.rw, BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...)
  168. case BlockHashesMsg:
  169. msgStream := rlp.NewStream(msg.Payload)
  170. if _, err := msgStream.List(); err != nil {
  171. return err
  172. }
  173. var i int
  174. iter := func() (hash []byte, ok bool) {
  175. hash, err := msgStream.Bytes()
  176. if err == rlp.EOL {
  177. return nil, false
  178. } else if err != nil {
  179. self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
  180. return nil, false
  181. }
  182. i++
  183. return hash, true
  184. }
  185. self.blockPool.AddBlockHashes(iter, self.id)
  186. case GetBlocksMsg:
  187. msgStream := rlp.NewStream(msg.Payload)
  188. if _, err := msgStream.List(); err != nil {
  189. return err
  190. }
  191. var blocks []interface{}
  192. var i int
  193. for {
  194. i++
  195. var hash []byte
  196. if err := msgStream.Decode(&hash); err != nil {
  197. if err == rlp.EOL {
  198. break
  199. } else {
  200. return self.protoError(ErrDecode, "msg %v: %v", msg, err)
  201. }
  202. }
  203. block := self.chainManager.GetBlock(hash)
  204. if block != nil {
  205. blocks = append(blocks, block)
  206. }
  207. if i == maxBlocks {
  208. break
  209. }
  210. }
  211. return p2p.EncodeMsg(self.rw, BlocksMsg, blocks...)
  212. case BlocksMsg:
  213. msgStream := rlp.NewStream(msg.Payload)
  214. if _, err := msgStream.List(); err != nil {
  215. return err
  216. }
  217. for {
  218. var block types.Block
  219. if err := msgStream.Decode(&block); err != nil {
  220. if err == rlp.EOL {
  221. break
  222. } else {
  223. return self.protoError(ErrDecode, "msg %v: %v", msg, err)
  224. }
  225. }
  226. self.blockPool.AddBlock(&block, self.id)
  227. }
  228. case NewBlockMsg:
  229. var request newBlockMsgData
  230. if err := msg.Decode(&request); err != nil {
  231. return self.protoError(ErrDecode, "msg %v: %v", msg, err)
  232. }
  233. hash := request.Block.Hash()
  234. fmt.Println("received block: %x", hash)
  235. _, chainHead, _ := self.chainManager.Status()
  236. jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
  237. BlockHash: ethutil.Bytes2Hex(hash),
  238. BlockNumber: request.Block.Number(), // this surely must be zero
  239. ChainHeadHash: ethutil.Bytes2Hex(chainHead),
  240. BlockPrevHash: ethutil.Bytes2Hex(request.Block.ParentHash()),
  241. RemoteId: self.peer.ID().String(),
  242. })
  243. // to simplify backend interface adding a new block
  244. // uses AddPeer followed by AddBlock only if peer is the best peer
  245. // (or selected as new best peer)
  246. if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) {
  247. self.blockPool.AddBlock(request.Block, self.id)
  248. }
  249. default:
  250. return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
  251. }
  252. return nil
  253. }
  254. type statusMsgData struct {
  255. ProtocolVersion uint32
  256. NetworkId uint32
  257. TD *big.Int
  258. CurrentBlock []byte
  259. GenesisBlock []byte
  260. }
  261. func (self *ethProtocol) statusMsg() p2p.Msg {
  262. td, currentBlock, genesisBlock := self.chainManager.Status()
  263. return p2p.NewMsg(StatusMsg,
  264. uint32(ProtocolVersion),
  265. uint32(NetworkId),
  266. td,
  267. currentBlock,
  268. genesisBlock,
  269. )
  270. }
  271. func (self *ethProtocol) handleStatus() error {
  272. // send precanned status message
  273. if err := self.rw.WriteMsg(self.statusMsg()); err != nil {
  274. return err
  275. }
  276. // read and handle remote status
  277. msg, err := self.rw.ReadMsg()
  278. if err != nil {
  279. return err
  280. }
  281. if msg.Code != StatusMsg {
  282. return self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
  283. }
  284. if msg.Size > ProtocolMaxMsgSize {
  285. return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  286. }
  287. var status statusMsgData
  288. if err := msg.Decode(&status); err != nil {
  289. return self.protoError(ErrDecode, "msg %v: %v", msg, err)
  290. }
  291. _, _, genesisBlock := self.chainManager.Status()
  292. if !bytes.Equal(status.GenesisBlock, genesisBlock) {
  293. return self.protoError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock)
  294. }
  295. if status.NetworkId != NetworkId {
  296. return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId)
  297. }
  298. if ProtocolVersion != status.ProtocolVersion {
  299. return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion)
  300. }
  301. self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
  302. self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
  303. return nil
  304. }
  305. func (self *ethProtocol) requestBlockHashes(from []byte) error {
  306. self.peer.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4])
  307. return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(maxHashes))
  308. }
  309. func (self *ethProtocol) requestBlocks(hashes [][]byte) error {
  310. self.peer.Debugf("fetching %v blocks", len(hashes))
  311. return p2p.EncodeMsg(self.rw, GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...)
  312. }
  313. func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
  314. err = self.errors.New(code, format, params...)
  315. err.Log(self.peer.Logger)
  316. return
  317. }
  318. func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) {
  319. err.Log(self.peer.Logger)
  320. if err.Fatal() {
  321. self.peer.Disconnect(p2p.DiscSubprotocolError)
  322. }
  323. }
  324. func (self *ethProtocol) propagateTxs() {
  325. transactions := self.txPool.GetTransactions()
  326. iface := make([]interface{}, len(transactions))
  327. for i, transaction := range transactions {
  328. iface[i] = transaction
  329. }
  330. self.rw.WriteMsg(p2p.NewMsg(TxMsg, iface...))
  331. }