sync.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "math/rand"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/core/types"
  23. "github.com/ethereum/go-ethereum/eth/downloader"
  24. "github.com/ethereum/go-ethereum/logger"
  25. "github.com/ethereum/go-ethereum/logger/glog"
  26. "github.com/ethereum/go-ethereum/p2p/discover"
  27. )
  28. const (
  29. forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
  30. minDesiredPeerCount = 5 // Amount of peers desired to start syncing
  31. // This is the target size for the packs of transactions sent by txsyncLoop.
  32. // A pack can get larger than this if a single transactions exceeds this size.
  33. txsyncPackSize = 100 * 1024
  34. )
  35. type txsync struct {
  36. p *peer
  37. txs []*types.Transaction
  38. }
  39. // syncTransactions starts sending all currently pending transactions to the given peer.
  40. func (pm *ProtocolManager) syncTransactions(p *peer) {
  41. txs := pm.txpool.GetTransactions()
  42. if len(txs) == 0 {
  43. return
  44. }
  45. select {
  46. case pm.txsyncCh <- &txsync{p, txs}:
  47. case <-pm.quitSync:
  48. }
  49. }
  50. // txsyncLoop takes care of the initial transaction sync for each new
  51. // connection. When a new peer appears, we relay all currently pending
  52. // transactions. In order to minimise egress bandwidth usage, we send
  53. // the transactions in small packs to one peer at a time.
  54. func (pm *ProtocolManager) txsyncLoop() {
  55. var (
  56. pending = make(map[discover.NodeID]*txsync)
  57. sending = false // whether a send is active
  58. pack = new(txsync) // the pack that is being sent
  59. done = make(chan error, 1) // result of the send
  60. )
  61. // send starts a sending a pack of transactions from the sync.
  62. send := func(s *txsync) {
  63. // Fill pack with transactions up to the target size.
  64. size := common.StorageSize(0)
  65. pack.p = s.p
  66. pack.txs = pack.txs[:0]
  67. for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
  68. pack.txs = append(pack.txs, s.txs[i])
  69. size += s.txs[i].Size()
  70. }
  71. // Remove the transactions that will be sent.
  72. s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
  73. if len(s.txs) == 0 {
  74. delete(pending, s.p.ID())
  75. }
  76. // Send the pack in the background.
  77. glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size)
  78. sending = true
  79. go func() { done <- pack.p.SendTransactions(pack.txs) }()
  80. }
  81. // pick chooses the next pending sync.
  82. pick := func() *txsync {
  83. if len(pending) == 0 {
  84. return nil
  85. }
  86. n := rand.Intn(len(pending)) + 1
  87. for _, s := range pending {
  88. if n--; n == 0 {
  89. return s
  90. }
  91. }
  92. return nil
  93. }
  94. for {
  95. select {
  96. case s := <-pm.txsyncCh:
  97. pending[s.p.ID()] = s
  98. if !sending {
  99. send(s)
  100. }
  101. case err := <-done:
  102. sending = false
  103. // Stop tracking peers that cause send failures.
  104. if err != nil {
  105. glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err)
  106. delete(pending, pack.p.ID())
  107. }
  108. // Schedule the next send.
  109. if s := pick(); s != nil {
  110. send(s)
  111. }
  112. case <-pm.quitSync:
  113. return
  114. }
  115. }
  116. }
  117. // syncer is responsible for periodically synchronising with the network, both
  118. // downloading hashes and blocks as well as handling the announcement handler.
  119. func (pm *ProtocolManager) syncer() {
  120. // Start and ensure cleanup of sync mechanisms
  121. pm.fetcher.Start()
  122. defer pm.fetcher.Stop()
  123. defer pm.downloader.Terminate()
  124. // Wait for different events to fire synchronisation operations
  125. forceSync := time.Tick(forceSyncCycle)
  126. for {
  127. select {
  128. case <-pm.newPeerCh:
  129. // Make sure we have peers to select from, then sync
  130. if pm.peers.Len() < minDesiredPeerCount {
  131. break
  132. }
  133. go pm.synchronise(pm.peers.BestPeer())
  134. case <-forceSync:
  135. // Force a sync even if not enough peers are present
  136. go pm.synchronise(pm.peers.BestPeer())
  137. case <-pm.noMorePeers:
  138. return
  139. }
  140. }
  141. }
  142. // synchronise tries to sync up our local block chain with a remote peer.
  143. func (pm *ProtocolManager) synchronise(peer *peer) {
  144. // Short circuit if no peers are available
  145. if peer == nil {
  146. return
  147. }
  148. // Make sure the peer's TD is higher than our own. If not drop.
  149. currentBlock := pm.blockchain.CurrentBlock()
  150. td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
  151. if peer.Td().Cmp(td) <= 0 {
  152. return
  153. }
  154. // Otherwise try to sync with the downloader
  155. mode := downloader.FullSync
  156. if atomic.LoadUint32(&pm.fastSync) == 1 {
  157. mode = downloader.FastSync
  158. }
  159. if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil {
  160. return
  161. }
  162. atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done
  163. // If fast sync was enabled, and we synced up, disable it
  164. if atomic.LoadUint32(&pm.fastSync) == 1 {
  165. // Disable fast sync if we indeed have something in our chain
  166. if pm.blockchain.CurrentBlock().NumberU64() > 0 {
  167. glog.V(logger.Info).Infof("fast sync complete, auto disabling")
  168. atomic.StoreUint32(&pm.fastSync, 0)
  169. }
  170. }
  171. }