message.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package p2p
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "io"
  6. "io/ioutil"
  7. "math/big"
  8. "github.com/ethereum/go-ethereum/ethutil"
  9. "github.com/ethereum/go-ethereum/rlp"
  10. )
  11. // Msg defines the structure of a p2p message.
  12. //
  13. // Note that a Msg can only be sent once since the Payload reader is
  14. // consumed during sending. It is not possible to create a Msg and
  15. // send it any number of times. If you want to reuse an encoded
  16. // structure, encode the payload into a byte array and create a
  17. // separate Msg with a bytes.Reader as Payload for each send.
  18. type Msg struct {
  19. Code uint64
  20. Size uint32 // size of the paylod
  21. Payload io.Reader
  22. }
  23. // NewMsg creates an RLP-encoded message with the given code.
  24. func NewMsg(code uint64, params ...interface{}) Msg {
  25. buf := new(bytes.Buffer)
  26. for _, p := range params {
  27. buf.Write(ethutil.Encode(p))
  28. }
  29. return Msg{Code: code, Size: uint32(buf.Len()), Payload: buf}
  30. }
  31. func encodePayload(params ...interface{}) []byte {
  32. buf := new(bytes.Buffer)
  33. for _, p := range params {
  34. buf.Write(ethutil.Encode(p))
  35. }
  36. return buf.Bytes()
  37. }
  38. // Value returns the decoded RLP payload items in a message.
  39. func (msg Msg) Value() (*ethutil.Value, error) {
  40. var v []interface{}
  41. err := msg.Decode(&v)
  42. return ethutil.NewValue(v), err
  43. }
  44. // Decode parse the RLP content of a message into
  45. // the given value, which must be a pointer.
  46. //
  47. // For the decoding rules, please see package rlp.
  48. func (msg Msg) Decode(val interface{}) error {
  49. s := rlp.NewListStream(msg.Payload, uint64(msg.Size))
  50. return s.Decode(val)
  51. }
  52. // Discard reads any remaining payload data into a black hole.
  53. func (msg Msg) Discard() error {
  54. _, err := io.Copy(ioutil.Discard, msg.Payload)
  55. return err
  56. }
  57. type MsgReader interface {
  58. ReadMsg() (Msg, error)
  59. }
  60. type MsgWriter interface {
  61. // WriteMsg sends an existing message.
  62. // The Payload reader of the message is consumed.
  63. // Note that messages can be sent only once.
  64. WriteMsg(Msg) error
  65. // EncodeMsg writes an RLP-encoded message with the given
  66. // code and data elements.
  67. EncodeMsg(code uint64, data ...interface{}) error
  68. }
  69. // MsgReadWriter provides reading and writing of encoded messages.
  70. type MsgReadWriter interface {
  71. MsgReader
  72. MsgWriter
  73. }
  74. // MsgLoop reads messages off the given reader and
  75. // calls the handler function for each decoded message until
  76. // it returns an error or the peer connection is closed.
  77. //
  78. // If a message is larger than the given maximum size,
  79. // MsgLoop returns an appropriate error.
  80. func MsgLoop(r MsgReader, maxsize uint32, f func(code uint64, data *ethutil.Value) error) error {
  81. for {
  82. msg, err := r.ReadMsg()
  83. if err != nil {
  84. return err
  85. }
  86. if msg.Size > maxsize {
  87. return newPeerError(errInvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize)
  88. }
  89. value, err := msg.Value()
  90. if err != nil {
  91. return err
  92. }
  93. if err := f(msg.Code, value); err != nil {
  94. return err
  95. }
  96. }
  97. }
  98. var magicToken = []byte{34, 64, 8, 145}
  99. func writeMsg(w io.Writer, msg Msg) error {
  100. // TODO: handle case when Size + len(code) + len(listhdr) overflows uint32
  101. code := ethutil.Encode(uint32(msg.Code))
  102. listhdr := makeListHeader(msg.Size + uint32(len(code)))
  103. payloadLen := uint32(len(listhdr)) + uint32(len(code)) + msg.Size
  104. start := make([]byte, 8)
  105. copy(start, magicToken)
  106. binary.BigEndian.PutUint32(start[4:], payloadLen)
  107. for _, b := range [][]byte{start, listhdr, code} {
  108. if _, err := w.Write(b); err != nil {
  109. return err
  110. }
  111. }
  112. _, err := io.CopyN(w, msg.Payload, int64(msg.Size))
  113. return err
  114. }
  115. func makeListHeader(length uint32) []byte {
  116. if length < 56 {
  117. return []byte{byte(length + 0xc0)}
  118. }
  119. enc := big.NewInt(int64(length)).Bytes()
  120. lenb := byte(len(enc)) + 0xf7
  121. return append([]byte{lenb}, enc...)
  122. }
  123. // readMsg reads a message header from r.
  124. // It takes an rlp.ByteReader to ensure that the decoding doesn't buffer.
  125. func readMsg(r rlp.ByteReader) (msg Msg, err error) {
  126. // read magic and payload size
  127. start := make([]byte, 8)
  128. if _, err = io.ReadFull(r, start); err != nil {
  129. return msg, newPeerError(errRead, "%v", err)
  130. }
  131. if !bytes.HasPrefix(start, magicToken) {
  132. return msg, newPeerError(errMagicTokenMismatch, "got %x, want %x", start[:4], magicToken)
  133. }
  134. size := binary.BigEndian.Uint32(start[4:])
  135. // decode start of RLP message to get the message code
  136. posr := &postrack{r, 0}
  137. s := rlp.NewStream(posr)
  138. if _, err := s.List(); err != nil {
  139. return msg, err
  140. }
  141. code, err := s.Uint()
  142. if err != nil {
  143. return msg, err
  144. }
  145. payloadsize := size - posr.p
  146. return Msg{code, payloadsize, io.LimitReader(r, int64(payloadsize))}, nil
  147. }
  148. // postrack wraps an rlp.ByteReader with a position counter.
  149. type postrack struct {
  150. r rlp.ByteReader
  151. p uint32
  152. }
  153. func (r *postrack) Read(buf []byte) (int, error) {
  154. n, err := r.r.Read(buf)
  155. r.p += uint32(n)
  156. return n, err
  157. }
  158. func (r *postrack) ReadByte() (byte, error) {
  159. b, err := r.r.ReadByte()
  160. if err == nil {
  161. r.p++
  162. }
  163. return b, err
  164. }