peer_test.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package p2p
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "net"
  8. "reflect"
  9. "testing"
  10. "time"
  11. )
  12. var discard = Protocol{
  13. Name: "discard",
  14. Length: 1,
  15. Run: func(p *Peer, rw MsgReadWriter) error {
  16. for {
  17. msg, err := rw.ReadMsg()
  18. if err != nil {
  19. return err
  20. }
  21. fmt.Printf("discarding %d\n", msg.Code)
  22. if err = msg.Discard(); err != nil {
  23. return err
  24. }
  25. }
  26. },
  27. }
  28. func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) {
  29. fd1, _ := net.Pipe()
  30. hs1 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion}
  31. hs2 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion}
  32. for _, p := range protos {
  33. hs1.Caps = append(hs1.Caps, p.cap())
  34. hs2.Caps = append(hs2.Caps, p.cap())
  35. }
  36. p1, p2 := MsgPipe()
  37. peer := newPeer(fd1, &conn{p1, hs1}, protos)
  38. errc := make(chan DiscReason, 1)
  39. go func() { errc <- peer.run() }()
  40. closer := func() {
  41. p1.Close()
  42. fd1.Close()
  43. }
  44. return closer, &conn{p2, hs2}, peer, errc
  45. }
  46. func TestPeerProtoReadMsg(t *testing.T) {
  47. done := make(chan struct{})
  48. proto := Protocol{
  49. Name: "a",
  50. Length: 5,
  51. Run: func(peer *Peer, rw MsgReadWriter) error {
  52. if err := ExpectMsg(rw, 2, []uint{1}); err != nil {
  53. t.Error(err)
  54. }
  55. if err := ExpectMsg(rw, 3, []uint{2}); err != nil {
  56. t.Error(err)
  57. }
  58. if err := ExpectMsg(rw, 4, []uint{3}); err != nil {
  59. t.Error(err)
  60. }
  61. close(done)
  62. return nil
  63. },
  64. }
  65. closer, rw, _, errc := testPeer([]Protocol{proto})
  66. defer closer()
  67. Send(rw, baseProtocolLength+2, []uint{1})
  68. Send(rw, baseProtocolLength+3, []uint{2})
  69. Send(rw, baseProtocolLength+4, []uint{3})
  70. select {
  71. case <-done:
  72. case err := <-errc:
  73. t.Errorf("peer returned: %v", err)
  74. case <-time.After(2 * time.Second):
  75. t.Errorf("receive timeout")
  76. }
  77. }
  78. func TestPeerProtoEncodeMsg(t *testing.T) {
  79. proto := Protocol{
  80. Name: "a",
  81. Length: 2,
  82. Run: func(peer *Peer, rw MsgReadWriter) error {
  83. if err := SendItems(rw, 2); err == nil {
  84. t.Error("expected error for out-of-range msg code, got nil")
  85. }
  86. if err := SendItems(rw, 1, "foo", "bar"); err != nil {
  87. t.Errorf("write error: %v", err)
  88. }
  89. return nil
  90. },
  91. }
  92. closer, rw, _, _ := testPeer([]Protocol{proto})
  93. defer closer()
  94. if err := ExpectMsg(rw, 17, []string{"foo", "bar"}); err != nil {
  95. t.Error(err)
  96. }
  97. }
  98. func TestPeerWriteForBroadcast(t *testing.T) {
  99. closer, rw, peer, peerErr := testPeer([]Protocol{discard})
  100. defer closer()
  101. emptymsg := func(code uint64) Msg {
  102. return Msg{Code: code, Size: 0, Payload: bytes.NewReader(nil)}
  103. }
  104. // test write errors
  105. if err := peer.writeProtoMsg("b", emptymsg(3)); err == nil {
  106. t.Errorf("expected error for unknown protocol, got nil")
  107. }
  108. if err := peer.writeProtoMsg("discard", emptymsg(8)); err == nil {
  109. t.Errorf("expected error for out-of-range msg code, got nil")
  110. } else if perr, ok := err.(*peerError); !ok || perr.Code != errInvalidMsgCode {
  111. t.Errorf("wrong error for out-of-range msg code, got %#v", err)
  112. }
  113. // setup for reading the message on the other end
  114. read := make(chan struct{})
  115. go func() {
  116. if err := ExpectMsg(rw, 16, nil); err != nil {
  117. t.Error(err)
  118. }
  119. close(read)
  120. }()
  121. // test successful write
  122. if err := peer.writeProtoMsg("discard", emptymsg(0)); err != nil {
  123. t.Errorf("expect no error for known protocol: %v", err)
  124. }
  125. select {
  126. case <-read:
  127. case err := <-peerErr:
  128. t.Fatalf("peer stopped: %v", err)
  129. }
  130. }
  131. func TestPeerPing(t *testing.T) {
  132. closer, rw, _, _ := testPeer(nil)
  133. defer closer()
  134. if err := SendItems(rw, pingMsg); err != nil {
  135. t.Fatal(err)
  136. }
  137. if err := ExpectMsg(rw, pongMsg, nil); err != nil {
  138. t.Error(err)
  139. }
  140. }
  141. func TestPeerDisconnect(t *testing.T) {
  142. closer, rw, _, disc := testPeer(nil)
  143. defer closer()
  144. if err := SendItems(rw, discMsg, DiscQuitting); err != nil {
  145. t.Fatal(err)
  146. }
  147. select {
  148. case reason := <-disc:
  149. if reason != DiscQuitting {
  150. t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested)
  151. }
  152. case <-time.After(500 * time.Millisecond):
  153. t.Error("peer did not return")
  154. }
  155. }
  156. // This test is supposed to verify that Peer can reliably handle
  157. // multiple causes of disconnection occurring at the same time.
  158. func TestPeerDisconnectRace(t *testing.T) {
  159. maybe := func() bool { return rand.Intn(1) == 1 }
  160. for i := 0; i < 1000; i++ {
  161. protoclose := make(chan error)
  162. protodisc := make(chan DiscReason)
  163. closer, rw, p, disc := testPeer([]Protocol{
  164. {
  165. Name: "closereq",
  166. Run: func(p *Peer, rw MsgReadWriter) error { return <-protoclose },
  167. Length: 1,
  168. },
  169. {
  170. Name: "disconnect",
  171. Run: func(p *Peer, rw MsgReadWriter) error { p.Disconnect(<-protodisc); return nil },
  172. Length: 1,
  173. },
  174. })
  175. // Simulate incoming messages.
  176. go SendItems(rw, baseProtocolLength+1)
  177. go SendItems(rw, baseProtocolLength+2)
  178. // Close the network connection.
  179. go closer()
  180. // Make protocol "closereq" return.
  181. protoclose <- errors.New("protocol closed")
  182. // Make protocol "disconnect" call peer.Disconnect
  183. protodisc <- DiscAlreadyConnected
  184. // In some cases, simulate something else calling peer.Disconnect.
  185. if maybe() {
  186. go p.Disconnect(DiscInvalidIdentity)
  187. }
  188. // In some cases, simulate remote requesting a disconnect.
  189. if maybe() {
  190. go SendItems(rw, discMsg, DiscQuitting)
  191. }
  192. select {
  193. case <-disc:
  194. case <-time.After(2 * time.Second):
  195. // Peer.run should return quickly. If it doesn't the Peer
  196. // goroutines are probably deadlocked. Call panic in order to
  197. // show the stacks.
  198. panic("Peer.run took to long to return.")
  199. }
  200. }
  201. }
  202. func TestNewPeer(t *testing.T) {
  203. name := "nodename"
  204. caps := []Cap{{"foo", 2}, {"bar", 3}}
  205. id := randomID()
  206. p := NewPeer(id, name, caps)
  207. if p.ID() != id {
  208. t.Errorf("ID mismatch: got %v, expected %v", p.ID(), id)
  209. }
  210. if p.Name() != name {
  211. t.Errorf("Name mismatch: got %v, expected %v", p.Name(), name)
  212. }
  213. if !reflect.DeepEqual(p.Caps(), caps) {
  214. t.Errorf("Caps mismatch: got %v, expected %v", p.Caps(), caps)
  215. }
  216. p.Disconnect(DiscAlreadyConnected) // Should not hang
  217. }