peer_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of go-ethereum.
  3. //
  4. // go-ethereum is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // go-ethereum is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
  16. package whisper
  17. import (
  18. "testing"
  19. "time"
  20. "github.com/ethereum/go-ethereum/p2p"
  21. "github.com/ethereum/go-ethereum/p2p/discover"
  22. )
  23. type testPeer struct {
  24. client *Whisper
  25. stream *p2p.MsgPipeRW
  26. termed chan struct{}
  27. }
  28. func startTestPeer() *testPeer {
  29. // Create a simulated P2P remote peer and data streams to it
  30. remote := p2p.NewPeer(discover.NodeID{}, "", nil)
  31. tester, tested := p2p.MsgPipe()
  32. // Create a whisper client and connect with it to the tester peer
  33. client := New()
  34. client.Start()
  35. termed := make(chan struct{})
  36. go func() {
  37. defer client.Stop()
  38. defer close(termed)
  39. defer tested.Close()
  40. client.handlePeer(remote, tested)
  41. }()
  42. return &testPeer{
  43. client: client,
  44. stream: tester,
  45. termed: termed,
  46. }
  47. }
  48. func startTestPeerInited() (*testPeer, error) {
  49. peer := startTestPeer()
  50. if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
  51. peer.stream.Close()
  52. return nil, err
  53. }
  54. if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
  55. peer.stream.Close()
  56. return nil, err
  57. }
  58. return peer, nil
  59. }
  60. func TestPeerStatusMessage(t *testing.T) {
  61. tester := startTestPeer()
  62. // Wait for the handshake status message and check it
  63. if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
  64. t.Fatalf("status message mismatch: %v", err)
  65. }
  66. // Terminate the node
  67. tester.stream.Close()
  68. select {
  69. case <-tester.termed:
  70. case <-time.After(time.Second):
  71. t.Fatalf("local close timed out")
  72. }
  73. }
  74. func TestPeerHandshakeFail(t *testing.T) {
  75. tester := startTestPeer()
  76. // Wait for and check the handshake
  77. if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
  78. t.Fatalf("status message mismatch: %v", err)
  79. }
  80. // Send an invalid handshake status and verify disconnect
  81. if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
  82. t.Fatalf("failed to send malformed status: %v", err)
  83. }
  84. select {
  85. case <-tester.termed:
  86. case <-time.After(time.Second):
  87. t.Fatalf("remote close timed out")
  88. }
  89. }
  90. func TestPeerHandshakeSuccess(t *testing.T) {
  91. tester := startTestPeer()
  92. // Wait for and check the handshake
  93. if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
  94. t.Fatalf("status message mismatch: %v", err)
  95. }
  96. // Send a valid handshake status and make sure connection stays live
  97. if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
  98. t.Fatalf("failed to send status: %v", err)
  99. }
  100. select {
  101. case <-tester.termed:
  102. t.Fatalf("valid handshake disconnected")
  103. case <-time.After(100 * time.Millisecond):
  104. }
  105. // Clean up the test
  106. tester.stream.Close()
  107. select {
  108. case <-tester.termed:
  109. case <-time.After(time.Second):
  110. t.Fatalf("local close timed out")
  111. }
  112. }
  113. func TestPeerSend(t *testing.T) {
  114. // Start a tester and execute the handshake
  115. tester, err := startTestPeerInited()
  116. if err != nil {
  117. t.Fatalf("failed to start initialized peer: %v", err)
  118. }
  119. defer tester.stream.Close()
  120. // Construct a message and inject into the tester
  121. message := NewMessage([]byte("peer broadcast test message"))
  122. envelope, err := message.Wrap(DefaultPoW, Options{
  123. TTL: DefaultTTL,
  124. })
  125. if err != nil {
  126. t.Fatalf("failed to wrap message: %v", err)
  127. }
  128. if err := tester.client.Send(envelope); err != nil {
  129. t.Fatalf("failed to send message: %v", err)
  130. }
  131. // Check that the message is eventually forwarded
  132. payload := []interface{}{envelope}
  133. if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
  134. t.Fatalf("message mismatch: %v", err)
  135. }
  136. // Make sure that even with a re-insert, an empty batch is received
  137. if err := tester.client.Send(envelope); err != nil {
  138. t.Fatalf("failed to send message: %v", err)
  139. }
  140. if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
  141. t.Fatalf("message mismatch: %v", err)
  142. }
  143. }
  144. func TestPeerDeliver(t *testing.T) {
  145. // Start a tester and execute the handshake
  146. tester, err := startTestPeerInited()
  147. if err != nil {
  148. t.Fatalf("failed to start initialized peer: %v", err)
  149. }
  150. defer tester.stream.Close()
  151. // Watch for all inbound messages
  152. arrived := make(chan struct{}, 1)
  153. tester.client.Watch(Filter{
  154. Fn: func(message *Message) {
  155. arrived <- struct{}{}
  156. },
  157. })
  158. // Construct a message and deliver it to the tester peer
  159. message := NewMessage([]byte("peer broadcast test message"))
  160. envelope, err := message.Wrap(DefaultPoW, Options{
  161. TTL: DefaultTTL,
  162. })
  163. if err != nil {
  164. t.Fatalf("failed to wrap message: %v", err)
  165. }
  166. if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
  167. t.Fatalf("failed to transfer message: %v", err)
  168. }
  169. // Check that the message is delivered upstream
  170. select {
  171. case <-arrived:
  172. case <-time.After(time.Second):
  173. t.Fatalf("message delivery timeout")
  174. }
  175. // Check that a resend is not delivered
  176. if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
  177. t.Fatalf("failed to transfer message: %v", err)
  178. }
  179. select {
  180. case <-time.After(2 * transmissionCycle):
  181. case <-arrived:
  182. t.Fatalf("repeating message arrived")
  183. }
  184. }
  185. func TestPeerMessageExpiration(t *testing.T) {
  186. // Start a tester and execute the handshake
  187. tester, err := startTestPeerInited()
  188. if err != nil {
  189. t.Fatalf("failed to start initialized peer: %v", err)
  190. }
  191. defer tester.stream.Close()
  192. // Fetch the peer instance for later inspection
  193. tester.client.peerMu.RLock()
  194. if peers := len(tester.client.peers); peers != 1 {
  195. t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
  196. }
  197. var peer *peer
  198. for peer, _ = range tester.client.peers {
  199. break
  200. }
  201. tester.client.peerMu.RUnlock()
  202. // Construct a message and pass it through the tester
  203. message := NewMessage([]byte("peer test message"))
  204. envelope, err := message.Wrap(DefaultPoW, Options{
  205. TTL: time.Second,
  206. })
  207. if err != nil {
  208. t.Fatalf("failed to wrap message: %v", err)
  209. }
  210. if err := tester.client.Send(envelope); err != nil {
  211. t.Fatalf("failed to send message: %v", err)
  212. }
  213. payload := []interface{}{envelope}
  214. if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
  215. t.Fatalf("message mismatch: %v", err)
  216. }
  217. // Check that the message is inside the cache
  218. if !peer.known.Has(envelope.Hash()) {
  219. t.Fatalf("message not found in cache")
  220. }
  221. // Discard messages until expiration and check cache again
  222. exp := time.Now().Add(time.Second + expirationCycle)
  223. for time.Now().Before(exp) {
  224. if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
  225. t.Fatalf("message mismatch: %v", err)
  226. }
  227. }
  228. if peer.known.Has(envelope.Hash()) {
  229. t.Fatalf("message not expired from cache")
  230. }
  231. }