messaging.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. // Package wire provides low level access to the Ethereum network and allows
  2. // you to broadcast data over the network.
  3. package wire
  4. import (
  5. "bytes"
  6. "fmt"
  7. "net"
  8. "time"
  9. "github.com/ethereum/go-ethereum/ethutil"
  10. )
  11. // Connection interface describing the methods required to implement the wire protocol.
  12. type Conn interface {
  13. Write(typ MsgType, v ...interface{}) error
  14. Read() *Msg
  15. }
  16. // The magic token which should be the first 4 bytes of every message and can be used as separator between messages.
  17. var MagicToken = []byte{34, 64, 8, 145}
  18. type MsgType byte
  19. const (
  20. // Values are given explicitly instead of by iota because these values are
  21. // defined by the wire protocol spec; it is easier for humans to ensure
  22. // correctness when values are explicit.
  23. MsgHandshakeTy = 0x00
  24. MsgDiscTy = 0x01
  25. MsgPingTy = 0x02
  26. MsgPongTy = 0x03
  27. MsgGetPeersTy = 0x04
  28. MsgPeersTy = 0x05
  29. MsgStatusTy = 0x10
  30. //MsgGetTxsTy = 0x11
  31. MsgTxTy = 0x12
  32. MsgGetBlockHashesTy = 0x13
  33. MsgBlockHashesTy = 0x14
  34. MsgGetBlocksTy = 0x15
  35. MsgBlockTy = 0x16
  36. MsgNewBlockTy = 0x17
  37. )
  38. var msgTypeToString = map[MsgType]string{
  39. MsgHandshakeTy: "Handshake",
  40. MsgDiscTy: "Disconnect",
  41. MsgPingTy: "Ping",
  42. MsgPongTy: "Pong",
  43. MsgGetPeersTy: "Get peers",
  44. MsgStatusTy: "Status",
  45. MsgPeersTy: "Peers",
  46. MsgTxTy: "Transactions",
  47. MsgBlockTy: "Blocks",
  48. //MsgGetTxsTy: "Get Txs",
  49. MsgGetBlockHashesTy: "Get block hashes",
  50. MsgBlockHashesTy: "Block hashes",
  51. MsgGetBlocksTy: "Get blocks",
  52. }
  53. func (mt MsgType) String() string {
  54. return msgTypeToString[mt]
  55. }
  56. type Msg struct {
  57. Type MsgType // Specifies how the encoded data should be interpreted
  58. //Data []byte
  59. Data *ethutil.Value
  60. }
  61. func NewMessage(msgType MsgType, data interface{}) *Msg {
  62. return &Msg{
  63. Type: msgType,
  64. Data: ethutil.NewValue(data),
  65. }
  66. }
  67. type Messages []*Msg
  68. // The basic message reader waits for data on the given connection, decoding
  69. // and doing a few sanity checks such as if there's a data type and
  70. // unmarhals the given data
  71. func ReadMessages(conn net.Conn) (msgs []*Msg, err error) {
  72. // The recovering function in case anything goes horribly wrong
  73. defer func() {
  74. if r := recover(); r != nil {
  75. err = fmt.Errorf("wire.ReadMessage error: %v", r)
  76. }
  77. }()
  78. var (
  79. buff []byte
  80. messages [][]byte
  81. msgLength int
  82. )
  83. for {
  84. // Give buffering some time
  85. conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond))
  86. // Create a new temporarily buffer
  87. b := make([]byte, 1440)
  88. n, _ := conn.Read(b)
  89. if err != nil && n == 0 {
  90. if err.Error() != "EOF" {
  91. fmt.Println("err now", err)
  92. return nil, err
  93. } else {
  94. break
  95. }
  96. }
  97. if n == 0 && len(buff) == 0 {
  98. // If there's nothing on the wire wait for a bit
  99. time.Sleep(200 * time.Millisecond)
  100. continue
  101. }
  102. buff = append(buff, b[:n]...)
  103. if msgLength == 0 {
  104. // Check if the received 4 first bytes are the magic token
  105. if bytes.Compare(MagicToken, buff[:4]) != 0 {
  106. return nil, fmt.Errorf("MagicToken mismatch. Received %v", buff[:4])
  107. }
  108. // Read the length of the message
  109. msgLength = int(ethutil.BytesToNumber(buff[4:8]))
  110. // Remove the token and length
  111. buff = buff[8:]
  112. }
  113. if len(buff) >= msgLength {
  114. messages = append(messages, buff[:msgLength])
  115. buff = buff[msgLength:]
  116. msgLength = 0
  117. if len(buff) == 0 {
  118. break
  119. }
  120. }
  121. }
  122. for _, m := range messages {
  123. decoder := ethutil.NewValueFromBytes(m)
  124. // Type of message
  125. t := decoder.Get(0).Uint()
  126. // Actual data
  127. d := decoder.SliceFrom(1)
  128. msgs = append(msgs, &Msg{Type: MsgType(t), Data: d})
  129. }
  130. return
  131. }
  132. // The basic message writer takes care of writing data over the given
  133. // connection and does some basic error checking
  134. func WriteMessage(conn net.Conn, msg *Msg) error {
  135. var pack []byte
  136. // Encode the type and the (RLP encoded) data for sending over the wire
  137. encoded := ethutil.NewValue(append([]interface{}{byte(msg.Type)}, msg.Data.Slice()...)).Encode()
  138. payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32)
  139. // Write magic token and payload length (first 8 bytes)
  140. pack = append(MagicToken, payloadLength...)
  141. pack = append(pack, encoded...)
  142. //fmt.Printf("payload %v (%v) %q\n", msg.Type, conn.RemoteAddr(), encoded)
  143. // Write to the connection
  144. _, err := conn.Write(pack)
  145. if err != nil {
  146. return err
  147. }
  148. return nil
  149. }