message.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package p2p
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "math/big"
  11. "net"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "github.com/ethereum/go-ethereum/ethutil"
  16. "github.com/ethereum/go-ethereum/rlp"
  17. )
  18. // Msg defines the structure of a p2p message.
  19. //
  20. // Note that a Msg can only be sent once since the Payload reader is
  21. // consumed during sending. It is not possible to create a Msg and
  22. // send it any number of times. If you want to reuse an encoded
  23. // structure, encode the payload into a byte array and create a
  24. // separate Msg with a bytes.Reader as Payload for each send.
  25. type Msg struct {
  26. Code uint64
  27. Size uint32 // size of the paylod
  28. Payload io.Reader
  29. }
  30. // NewMsg creates an RLP-encoded message with the given code.
  31. func NewMsg(code uint64, params ...interface{}) Msg {
  32. buf := new(bytes.Buffer)
  33. for _, p := range params {
  34. buf.Write(ethutil.Encode(p))
  35. }
  36. return Msg{Code: code, Size: uint32(buf.Len()), Payload: buf}
  37. }
  38. func encodePayload(params ...interface{}) []byte {
  39. buf := new(bytes.Buffer)
  40. for _, p := range params {
  41. buf.Write(ethutil.Encode(p))
  42. }
  43. return buf.Bytes()
  44. }
  45. // Decode parse the RLP content of a message into
  46. // the given value, which must be a pointer.
  47. //
  48. // For the decoding rules, please see package rlp.
  49. func (msg Msg) Decode(val interface{}) error {
  50. s := rlp.NewListStream(msg.Payload, uint64(msg.Size))
  51. if err := s.Decode(val); err != nil {
  52. return newPeerError(errInvalidMsg, "(code %#x) (size %d) %v", msg.Code, msg.Size, err)
  53. }
  54. return nil
  55. }
  56. func (msg Msg) String() string {
  57. return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size)
  58. }
  59. // Discard reads any remaining payload data into a black hole.
  60. func (msg Msg) Discard() error {
  61. _, err := io.Copy(ioutil.Discard, msg.Payload)
  62. return err
  63. }
  64. type MsgReader interface {
  65. ReadMsg() (Msg, error)
  66. }
  67. type MsgWriter interface {
  68. // WriteMsg sends a message. It will block until the message's
  69. // Payload has been consumed by the other end.
  70. //
  71. // Note that messages can be sent only once because their
  72. // payload reader is drained.
  73. WriteMsg(Msg) error
  74. }
  75. // MsgReadWriter provides reading and writing of encoded messages.
  76. // Implementations should ensure that ReadMsg and WriteMsg can be
  77. // called simultaneously from multiple goroutines.
  78. type MsgReadWriter interface {
  79. MsgReader
  80. MsgWriter
  81. }
  82. // EncodeMsg writes an RLP-encoded message with the given code and
  83. // data elements.
  84. func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
  85. return w.WriteMsg(NewMsg(code, data...))
  86. }
  87. // frameRW is a MsgReadWriter that reads and writes devp2p message frames.
  88. // As required by the interface, ReadMsg and WriteMsg can be called from
  89. // multiple goroutines.
  90. type frameRW struct {
  91. net.Conn // make Conn methods available. be careful.
  92. bufconn *bufio.ReadWriter
  93. // this channel is used to 'lend' bufconn to a caller of ReadMsg
  94. // until the message payload has been consumed. the channel
  95. // receives a value when EOF is reached on the payload, unblocking
  96. // a pending call to ReadMsg.
  97. rsync chan struct{}
  98. // this mutex guards writes to bufconn.
  99. writeMu sync.Mutex
  100. }
  101. func newFrameRW(conn net.Conn, timeout time.Duration) *frameRW {
  102. rsync := make(chan struct{}, 1)
  103. rsync <- struct{}{}
  104. return &frameRW{
  105. Conn: conn,
  106. bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
  107. rsync: rsync,
  108. }
  109. }
  110. var magicToken = []byte{34, 64, 8, 145}
  111. func (rw *frameRW) WriteMsg(msg Msg) error {
  112. rw.writeMu.Lock()
  113. defer rw.writeMu.Unlock()
  114. rw.SetWriteDeadline(time.Now().Add(msgWriteTimeout))
  115. if err := writeMsg(rw.bufconn, msg); err != nil {
  116. return err
  117. }
  118. return rw.bufconn.Flush()
  119. }
  120. func writeMsg(w io.Writer, msg Msg) error {
  121. // TODO: handle case when Size + len(code) + len(listhdr) overflows uint32
  122. code := ethutil.Encode(uint32(msg.Code))
  123. listhdr := makeListHeader(msg.Size + uint32(len(code)))
  124. payloadLen := uint32(len(listhdr)) + uint32(len(code)) + msg.Size
  125. start := make([]byte, 8)
  126. copy(start, magicToken)
  127. binary.BigEndian.PutUint32(start[4:], payloadLen)
  128. for _, b := range [][]byte{start, listhdr, code} {
  129. if _, err := w.Write(b); err != nil {
  130. return err
  131. }
  132. }
  133. _, err := io.CopyN(w, msg.Payload, int64(msg.Size))
  134. return err
  135. }
  136. func makeListHeader(length uint32) []byte {
  137. if length < 56 {
  138. return []byte{byte(length + 0xc0)}
  139. }
  140. enc := big.NewInt(int64(length)).Bytes()
  141. lenb := byte(len(enc)) + 0xf7
  142. return append([]byte{lenb}, enc...)
  143. }
  144. func (rw *frameRW) ReadMsg() (msg Msg, err error) {
  145. <-rw.rsync // wait until bufconn is ours
  146. // this read timeout applies also to the payload.
  147. // TODO: proper read timeout
  148. rw.SetReadDeadline(time.Now().Add(msgReadTimeout))
  149. // read magic and payload size
  150. start := make([]byte, 8)
  151. if _, err = io.ReadFull(rw.bufconn, start); err != nil {
  152. return msg, err
  153. }
  154. if !bytes.HasPrefix(start, magicToken) {
  155. return msg, fmt.Errorf("bad magic token %x", start[:4], magicToken)
  156. }
  157. size := binary.BigEndian.Uint32(start[4:])
  158. // decode start of RLP message to get the message code
  159. posr := &postrack{rw.bufconn, 0}
  160. s := rlp.NewStream(posr)
  161. if _, err := s.List(); err != nil {
  162. return msg, err
  163. }
  164. msg.Code, err = s.Uint()
  165. if err != nil {
  166. return msg, err
  167. }
  168. msg.Size = size - posr.p
  169. if msg.Size <= wholePayloadSize {
  170. // msg is small, read all of it and move on to the next message.
  171. pbuf := make([]byte, msg.Size)
  172. if _, err := io.ReadFull(rw.bufconn, pbuf); err != nil {
  173. return msg, err
  174. }
  175. rw.rsync <- struct{}{} // bufconn is available again
  176. msg.Payload = bytes.NewReader(pbuf)
  177. } else {
  178. // lend bufconn to the caller until it has
  179. // consumed the payload. eofSignal will send a value
  180. // on rw.rsync when EOF is reached.
  181. pr := &eofSignal{rw.bufconn, msg.Size, rw.rsync}
  182. msg.Payload = pr
  183. }
  184. return msg, nil
  185. }
  186. // postrack wraps an rlp.ByteReader with a position counter.
  187. type postrack struct {
  188. r rlp.ByteReader
  189. p uint32
  190. }
  191. func (r *postrack) Read(buf []byte) (int, error) {
  192. n, err := r.r.Read(buf)
  193. r.p += uint32(n)
  194. return n, err
  195. }
  196. func (r *postrack) ReadByte() (byte, error) {
  197. b, err := r.r.ReadByte()
  198. if err == nil {
  199. r.p++
  200. }
  201. return b, err
  202. }
  203. // eofSignal wraps a reader with eof signaling. the eof channel is
  204. // closed when the wrapped reader returns an error or when count bytes
  205. // have been read.
  206. type eofSignal struct {
  207. wrapped io.Reader
  208. count uint32 // number of bytes left
  209. eof chan<- struct{}
  210. }
  211. // note: when using eofSignal to detect whether a message payload
  212. // has been read, Read might not be called for zero sized messages.
  213. func (r *eofSignal) Read(buf []byte) (int, error) {
  214. if r.count == 0 {
  215. if r.eof != nil {
  216. r.eof <- struct{}{}
  217. r.eof = nil
  218. }
  219. return 0, io.EOF
  220. }
  221. max := len(buf)
  222. if int(r.count) < len(buf) {
  223. max = int(r.count)
  224. }
  225. n, err := r.wrapped.Read(buf[:max])
  226. r.count -= uint32(n)
  227. if (err != nil || r.count == 0) && r.eof != nil {
  228. r.eof <- struct{}{} // tell Peer that msg has been consumed
  229. r.eof = nil
  230. }
  231. return n, err
  232. }
  233. // MsgPipe creates a message pipe. Reads on one end are matched
  234. // with writes on the other. The pipe is full-duplex, both ends
  235. // implement MsgReadWriter.
  236. func MsgPipe() (*MsgPipeRW, *MsgPipeRW) {
  237. var (
  238. c1, c2 = make(chan Msg), make(chan Msg)
  239. closing = make(chan struct{})
  240. closed = new(int32)
  241. rw1 = &MsgPipeRW{c1, c2, closing, closed}
  242. rw2 = &MsgPipeRW{c2, c1, closing, closed}
  243. )
  244. return rw1, rw2
  245. }
  246. // ErrPipeClosed is returned from pipe operations after the
  247. // pipe has been closed.
  248. var ErrPipeClosed = errors.New("p2p: read or write on closed message pipe")
  249. // MsgPipeRW is an endpoint of a MsgReadWriter pipe.
  250. type MsgPipeRW struct {
  251. w chan<- Msg
  252. r <-chan Msg
  253. closing chan struct{}
  254. closed *int32
  255. }
  256. // WriteMsg sends a messsage on the pipe.
  257. // It blocks until the receiver has consumed the message payload.
  258. func (p *MsgPipeRW) WriteMsg(msg Msg) error {
  259. if atomic.LoadInt32(p.closed) == 0 {
  260. consumed := make(chan struct{}, 1)
  261. msg.Payload = &eofSignal{msg.Payload, msg.Size, consumed}
  262. select {
  263. case p.w <- msg:
  264. if msg.Size > 0 {
  265. // wait for payload read or discard
  266. <-consumed
  267. }
  268. return nil
  269. case <-p.closing:
  270. }
  271. }
  272. return ErrPipeClosed
  273. }
  274. // ReadMsg returns a message sent on the other end of the pipe.
  275. func (p *MsgPipeRW) ReadMsg() (Msg, error) {
  276. if atomic.LoadInt32(p.closed) == 0 {
  277. select {
  278. case msg := <-p.r:
  279. return msg, nil
  280. case <-p.closing:
  281. }
  282. }
  283. return Msg{}, ErrPipeClosed
  284. }
  285. // Close unblocks any pending ReadMsg and WriteMsg calls on both ends
  286. // of the pipe. They will return ErrPipeClosed. Note that Close does
  287. // not interrupt any reads from a message payload.
  288. func (p *MsgPipeRW) Close() error {
  289. if atomic.AddInt32(p.closed, 1) != 1 {
  290. // someone else is already closing
  291. atomic.StoreInt32(p.closed, 1) // avoid overflow
  292. return nil
  293. }
  294. close(p.closing)
  295. return nil
  296. }