peer_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package whisper
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/ethereum/go-ethereum/p2p"
  6. "github.com/ethereum/go-ethereum/p2p/discover"
  7. )
  8. type testPeer struct {
  9. client *Whisper
  10. stream *p2p.MsgPipeRW
  11. termed chan struct{}
  12. }
  13. func startTestPeer() *testPeer {
  14. // Create a simulated P2P remote peer and data streams to it
  15. remote := p2p.NewPeer(discover.NodeID{}, "", nil)
  16. tester, tested := p2p.MsgPipe()
  17. // Create a whisper client and connect with it to the tester peer
  18. client := New()
  19. client.Start()
  20. termed := make(chan struct{})
  21. go func() {
  22. defer client.Stop()
  23. defer close(termed)
  24. defer tested.Close()
  25. client.handlePeer(remote, tested)
  26. }()
  27. return &testPeer{
  28. client: client,
  29. stream: tester,
  30. termed: termed,
  31. }
  32. }
  33. func startTestPeerInited() (*testPeer, error) {
  34. peer := startTestPeer()
  35. if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
  36. peer.stream.Close()
  37. return nil, err
  38. }
  39. if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
  40. peer.stream.Close()
  41. return nil, err
  42. }
  43. return peer, nil
  44. }
  45. func TestPeerStatusMessage(t *testing.T) {
  46. tester := startTestPeer()
  47. // Wait for the handshake status message and check it
  48. if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
  49. t.Fatalf("status message mismatch: %v", err)
  50. }
  51. // Terminate the node
  52. tester.stream.Close()
  53. select {
  54. case <-tester.termed:
  55. case <-time.After(time.Second):
  56. t.Fatalf("local close timed out")
  57. }
  58. }
  59. func TestPeerHandshakeFail(t *testing.T) {
  60. tester := startTestPeer()
  61. // Wait for and check the handshake
  62. if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
  63. t.Fatalf("status message mismatch: %v", err)
  64. }
  65. // Send an invalid handshake status and verify disconnect
  66. if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
  67. t.Fatalf("failed to send malformed status: %v", err)
  68. }
  69. select {
  70. case <-tester.termed:
  71. case <-time.After(time.Second):
  72. t.Fatalf("remote close timed out")
  73. }
  74. }
  75. func TestPeerHandshakeSuccess(t *testing.T) {
  76. tester := startTestPeer()
  77. // Wait for and check the handshake
  78. if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
  79. t.Fatalf("status message mismatch: %v", err)
  80. }
  81. // Send a valid handshake status and make sure connection stays live
  82. if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
  83. t.Fatalf("failed to send status: %v", err)
  84. }
  85. select {
  86. case <-tester.termed:
  87. t.Fatalf("valid handshake disconnected")
  88. case <-time.After(100 * time.Millisecond):
  89. }
  90. // Clean up the test
  91. tester.stream.Close()
  92. select {
  93. case <-tester.termed:
  94. case <-time.After(time.Second):
  95. t.Fatalf("local close timed out")
  96. }
  97. }
  98. func TestPeerSend(t *testing.T) {
  99. // Start a tester and execute the handshake
  100. tester, err := startTestPeerInited()
  101. if err != nil {
  102. t.Fatalf("failed to start initialized peer: %v", err)
  103. }
  104. defer tester.stream.Close()
  105. // Construct a message and inject into the tester
  106. message := NewMessage([]byte("peer broadcast test message"))
  107. envelope, err := message.Wrap(DefaultPoW, Options{
  108. TTL: DefaultTTL,
  109. })
  110. if err != nil {
  111. t.Fatalf("failed to wrap message: %v", err)
  112. }
  113. if err := tester.client.Send(envelope); err != nil {
  114. t.Fatalf("failed to send message: %v", err)
  115. }
  116. // Check that the message is eventually forwarded
  117. payload := []interface{}{envelope}
  118. if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
  119. t.Fatalf("message mismatch: %v", err)
  120. }
  121. // Make sure that even with a re-insert, an empty batch is received
  122. if err := tester.client.Send(envelope); err != nil {
  123. t.Fatalf("failed to send message: %v", err)
  124. }
  125. if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
  126. t.Fatalf("message mismatch: %v", err)
  127. }
  128. }
  129. func TestPeerDeliver(t *testing.T) {
  130. // Start a tester and execute the handshake
  131. tester, err := startTestPeerInited()
  132. if err != nil {
  133. t.Fatalf("failed to start initialized peer: %v", err)
  134. }
  135. defer tester.stream.Close()
  136. // Watch for all inbound messages
  137. arrived := make(chan struct{}, 1)
  138. tester.client.Watch(Filter{
  139. Fn: func(message *Message) {
  140. arrived <- struct{}{}
  141. },
  142. })
  143. // Construct a message and deliver it to the tester peer
  144. message := NewMessage([]byte("peer broadcast test message"))
  145. envelope, err := message.Wrap(DefaultPoW, Options{
  146. TTL: DefaultTTL,
  147. })
  148. if err != nil {
  149. t.Fatalf("failed to wrap message: %v", err)
  150. }
  151. if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
  152. t.Fatalf("failed to transfer message: %v", err)
  153. }
  154. // Check that the message is delivered upstream
  155. select {
  156. case <-arrived:
  157. case <-time.After(time.Second):
  158. t.Fatalf("message delivery timeout")
  159. }
  160. // Check that a resend is not delivered
  161. if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
  162. t.Fatalf("failed to transfer message: %v", err)
  163. }
  164. select {
  165. case <-time.After(2 * transmissionCycle):
  166. case <-arrived:
  167. t.Fatalf("repeating message arrived")
  168. }
  169. }
  170. func TestPeerMessageExpiration(t *testing.T) {
  171. // Start a tester and execute the handshake
  172. tester, err := startTestPeerInited()
  173. if err != nil {
  174. t.Fatalf("failed to start initialized peer: %v", err)
  175. }
  176. defer tester.stream.Close()
  177. // Fetch the peer instance for later inspection
  178. tester.client.peerMu.RLock()
  179. if peers := len(tester.client.peers); peers != 1 {
  180. t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
  181. }
  182. var peer *peer
  183. for peer, _ = range tester.client.peers {
  184. break
  185. }
  186. tester.client.peerMu.RUnlock()
  187. // Construct a message and pass it through the tester
  188. message := NewMessage([]byte("peer test message"))
  189. envelope, err := message.Wrap(DefaultPoW, Options{
  190. TTL: time.Second,
  191. })
  192. if err != nil {
  193. t.Fatalf("failed to wrap message: %v", err)
  194. }
  195. if err := tester.client.Send(envelope); err != nil {
  196. t.Fatalf("failed to send message: %v", err)
  197. }
  198. payload := []interface{}{envelope}
  199. if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
  200. t.Fatalf("message mismatch: %v", err)
  201. }
  202. // Check that the message is inside the cache
  203. if !peer.known.Has(envelope.Hash()) {
  204. t.Fatalf("message not found in cache")
  205. }
  206. // Discard messages until expiration and check cache again
  207. exp := time.Now().Add(time.Second + expirationCycle)
  208. for time.Now().Before(exp) {
  209. if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
  210. t.Fatalf("message mismatch: %v", err)
  211. }
  212. }
  213. if peer.known.Has(envelope.Hash()) {
  214. t.Fatalf("message not expired from cache")
  215. }
  216. }