protocol_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package eth
  2. import (
  3. "crypto/rand"
  4. "math/big"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/ethereum/go-ethereum/common"
  9. "github.com/ethereum/go-ethereum/core"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/ethereum/go-ethereum/crypto"
  12. "github.com/ethereum/go-ethereum/ethdb"
  13. "github.com/ethereum/go-ethereum/event"
  14. "github.com/ethereum/go-ethereum/p2p"
  15. "github.com/ethereum/go-ethereum/p2p/discover"
  16. )
  17. func init() {
  18. // glog.SetToStderr(true)
  19. // glog.SetV(6)
  20. }
  21. var testAccount = crypto.NewKey(rand.Reader)
  22. func TestStatusMsgErrors(t *testing.T) {
  23. pm := newProtocolManagerForTesting(nil)
  24. td, currentBlock, genesis := pm.chainman.Status()
  25. defer pm.Stop()
  26. tests := []struct {
  27. code uint64
  28. data interface{}
  29. wantError error
  30. }{
  31. {
  32. code: TxMsg, data: []interface{}{},
  33. wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"),
  34. },
  35. {
  36. code: StatusMsg, data: statusData{10, NetworkId, td, currentBlock, genesis},
  37. wantError: errResp(ErrProtocolVersionMismatch, "10 (!= 0)"),
  38. },
  39. {
  40. code: StatusMsg, data: statusData{uint32(ProtocolVersions[0]), 999, td, currentBlock, genesis},
  41. wantError: errResp(ErrNetworkIdMismatch, "999 (!= 0)"),
  42. },
  43. {
  44. code: StatusMsg, data: statusData{uint32(ProtocolVersions[0]), NetworkId, td, currentBlock, common.Hash{3}},
  45. wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000000000000000000000000000000000000000000000000000 (!= %x)", genesis),
  46. },
  47. }
  48. for i, test := range tests {
  49. p, errc := newTestPeer(pm)
  50. // The send call might hang until reset because
  51. // the protocol might not read the payload.
  52. go p2p.Send(p, test.code, test.data)
  53. select {
  54. case err := <-errc:
  55. if err == nil {
  56. t.Errorf("test %d: protocol returned nil error, want %q", test.wantError)
  57. } else if err.Error() != test.wantError.Error() {
  58. t.Errorf("test %d: wrong error: got %q, want %q", i, err, test.wantError)
  59. }
  60. case <-time.After(2 * time.Second):
  61. t.Errorf("protocol did not shut down withing 2 seconds")
  62. }
  63. p.close()
  64. }
  65. }
  66. // This test checks that received transactions are added to the local pool.
  67. func TestRecvTransactions(t *testing.T) {
  68. txAdded := make(chan []*types.Transaction)
  69. pm := newProtocolManagerForTesting(txAdded)
  70. p, _ := newTestPeer(pm)
  71. defer pm.Stop()
  72. defer p.close()
  73. p.handshake(t)
  74. tx := newtx(testAccount, 0, 0)
  75. if err := p2p.Send(p, TxMsg, []interface{}{tx}); err != nil {
  76. t.Fatalf("send error: %v", err)
  77. }
  78. select {
  79. case added := <-txAdded:
  80. if len(added) != 1 {
  81. t.Errorf("wrong number of added transactions: got %d, want 1", len(added))
  82. } else if added[0].Hash() != tx.Hash() {
  83. t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash())
  84. }
  85. case <-time.After(2 * time.Second):
  86. t.Errorf("no TxPreEvent received within 2 seconds")
  87. }
  88. }
  89. // This test checks that pending transactions are sent.
  90. func TestSendTransactions(t *testing.T) {
  91. pm := newProtocolManagerForTesting(nil)
  92. defer pm.Stop()
  93. // Fill the pool with big transactions.
  94. const txsize = txsyncPackSize / 10
  95. alltxs := make([]*types.Transaction, 100)
  96. for nonce := range alltxs {
  97. alltxs[nonce] = newtx(testAccount, uint64(nonce), txsize)
  98. }
  99. pm.txpool.AddTransactions(alltxs)
  100. // Connect several peers. They should all receive the pending transactions.
  101. var wg sync.WaitGroup
  102. checktxs := func(p *testPeer) {
  103. defer wg.Done()
  104. defer p.close()
  105. seen := make(map[common.Hash]bool)
  106. for _, tx := range alltxs {
  107. seen[tx.Hash()] = false
  108. }
  109. for n := 0; n < len(alltxs) && !t.Failed(); {
  110. var txs []*types.Transaction
  111. msg, err := p.ReadMsg()
  112. if err != nil {
  113. t.Errorf("%v: read error: %v", p.Peer, err)
  114. } else if msg.Code != TxMsg {
  115. t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
  116. }
  117. if err := msg.Decode(&txs); err != nil {
  118. t.Errorf("%v: %v", p.Peer, err)
  119. }
  120. for _, tx := range txs {
  121. hash := tx.Hash()
  122. seentx, want := seen[hash]
  123. if seentx {
  124. t.Errorf("%v: got tx more than once: %x", p.Peer, hash)
  125. }
  126. if !want {
  127. t.Errorf("%v: got unexpected tx: %x", p.Peer, hash)
  128. }
  129. seen[hash] = true
  130. n++
  131. }
  132. }
  133. }
  134. for i := 0; i < 3; i++ {
  135. p, _ := newTestPeer(pm)
  136. p.handshake(t)
  137. wg.Add(1)
  138. go checktxs(p)
  139. }
  140. wg.Wait()
  141. }
  142. // testPeer wraps all peer-related data for tests.
  143. type testPeer struct {
  144. p2p.MsgReadWriter // writing to the test peer feeds the protocol
  145. pipe *p2p.MsgPipeRW // the protocol read/writes on this end
  146. pm *ProtocolManager
  147. *peer
  148. }
  149. func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *ProtocolManager {
  150. var (
  151. em = new(event.TypeMux)
  152. db, _ = ethdb.NewMemDatabase()
  153. chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, db, core.FakePow{}, em)
  154. txpool = &fakeTxPool{added: txAdded}
  155. pm = NewProtocolManager(0, em, txpool, core.FakePow{}, chain)
  156. )
  157. pm.Start()
  158. return pm
  159. }
  160. func newTestPeer(pm *ProtocolManager) (*testPeer, <-chan error) {
  161. var id discover.NodeID
  162. rand.Read(id[:])
  163. rw1, rw2 := p2p.MsgPipe()
  164. peer := pm.newPeer(pm.protVer, pm.netId, p2p.NewPeer(id, "test peer", nil), rw2)
  165. errc := make(chan error, 1)
  166. go func() {
  167. pm.newPeerCh <- peer
  168. errc <- pm.handle(peer)
  169. }()
  170. return &testPeer{rw1, rw2, pm, peer}, errc
  171. }
  172. func (p *testPeer) handshake(t *testing.T) {
  173. td, currentBlock, genesis := p.pm.chainman.Status()
  174. msg := &statusData{
  175. ProtocolVersion: uint32(p.pm.protVer),
  176. NetworkId: uint32(p.pm.netId),
  177. TD: td,
  178. CurrentBlock: currentBlock,
  179. GenesisBlock: genesis,
  180. }
  181. if err := p2p.ExpectMsg(p, StatusMsg, msg); err != nil {
  182. t.Fatalf("status recv: %v", err)
  183. }
  184. if err := p2p.Send(p, StatusMsg, msg); err != nil {
  185. t.Fatalf("status send: %v", err)
  186. }
  187. }
  188. func (p *testPeer) close() {
  189. p.pipe.Close()
  190. }
  191. type fakeTxPool struct {
  192. // all transactions are collected.
  193. mu sync.Mutex
  194. all []*types.Transaction
  195. // if added is non-nil, it receives added transactions.
  196. added chan<- []*types.Transaction
  197. }
  198. func (pool *fakeTxPool) AddTransactions(txs []*types.Transaction) {
  199. pool.mu.Lock()
  200. defer pool.mu.Unlock()
  201. pool.all = append(pool.all, txs...)
  202. if pool.added != nil {
  203. pool.added <- txs
  204. }
  205. }
  206. func (pool *fakeTxPool) GetTransactions() types.Transactions {
  207. pool.mu.Lock()
  208. defer pool.mu.Unlock()
  209. txs := make([]*types.Transaction, len(pool.all))
  210. copy(txs, pool.all)
  211. return types.Transactions(txs)
  212. }
  213. func newtx(from *crypto.Key, nonce uint64, datasize int) *types.Transaction {
  214. data := make([]byte, datasize)
  215. tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), data)
  216. tx, _ = tx.SignECDSA(from.PrivateKey)
  217. return tx
  218. }