downloader.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package downloader
  2. import (
  3. "errors"
  4. "math/rand"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "gopkg.in/fatih/set.v0"
  9. "github.com/ethereum/go-ethereum/common"
  10. "github.com/ethereum/go-ethereum/core"
  11. "github.com/ethereum/go-ethereum/core/types"
  12. "github.com/ethereum/go-ethereum/event"
  13. "github.com/ethereum/go-ethereum/logger"
  14. "github.com/ethereum/go-ethereum/logger/glog"
  15. )
  16. const (
  17. MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling
  18. MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request
  19. MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
  20. peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
  21. hashTTL = 5 * time.Second // Time it takes for a hash request to time out
  22. )
  23. var (
  24. blockTTL = 5 * time.Second // Time it takes for a block request to time out
  25. crossCheckCycle = time.Second // Period after which to check for expired cross checks
  26. minDesiredPeerCount = 5 // Amount of peers desired to start syncing
  27. )
  28. var (
  29. errLowTd = errors.New("peers TD is too low")
  30. ErrBusy = errors.New("busy")
  31. errUnknownPeer = errors.New("peer is unknown or unhealthy")
  32. ErrBadPeer = errors.New("action from bad peer ignored")
  33. ErrStallingPeer = errors.New("peer is stalling")
  34. errNoPeers = errors.New("no peers to keep download active")
  35. ErrPendingQueue = errors.New("pending items in queue")
  36. ErrTimeout = errors.New("timeout")
  37. errEmptyHashSet = errors.New("empty hash set by peer")
  38. errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
  39. errAlreadyInPool = errors.New("hash already in pool")
  40. ErrInvalidChain = errors.New("retrieved hash chain is invalid")
  41. ErrCrossCheckFailed = errors.New("block cross-check failed")
  42. errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
  43. errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
  44. errNoSyncActive = errors.New("no sync active")
  45. )
  46. type hashCheckFn func(common.Hash) bool
  47. type getBlockFn func(common.Hash) *types.Block
  48. type chainInsertFn func(types.Blocks) (int, error)
  49. type hashIterFn func() (common.Hash, error)
  50. type blockPack struct {
  51. peerId string
  52. blocks []*types.Block
  53. }
  54. type hashPack struct {
  55. peerId string
  56. hashes []common.Hash
  57. }
  58. type crossCheck struct {
  59. expire time.Time
  60. parent common.Hash
  61. }
  62. type Downloader struct {
  63. mux *event.TypeMux
  64. mu sync.RWMutex
  65. queue *queue // Scheduler for selecting the hashes to download
  66. peers *peerSet // Set of active peers from which download can proceed
  67. checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
  68. banned *set.SetNonTS // Set of hashes we've received and banned
  69. // Callbacks
  70. hasBlock hashCheckFn
  71. getBlock getBlockFn
  72. // Status
  73. synchronising int32
  74. notified int32
  75. // Channels
  76. newPeerCh chan *peer
  77. hashCh chan hashPack
  78. blockCh chan blockPack
  79. cancelCh chan struct{} // Channel to cancel mid-flight syncs
  80. cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
  81. }
  82. // Block is an origin-tagged blockchain block.
  83. type Block struct {
  84. RawBlock *types.Block
  85. OriginPeer string
  86. }
  87. func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
  88. // Create the base downloader
  89. downloader := &Downloader{
  90. mux: mux,
  91. queue: newQueue(),
  92. peers: newPeerSet(),
  93. hasBlock: hasBlock,
  94. getBlock: getBlock,
  95. newPeerCh: make(chan *peer, 1),
  96. hashCh: make(chan hashPack, 1),
  97. blockCh: make(chan blockPack, 1),
  98. }
  99. // Inject all the known bad hashes
  100. downloader.banned = set.NewNonTS()
  101. for hash, _ := range core.BadHashes {
  102. downloader.banned.Add(hash)
  103. }
  104. return downloader
  105. }
  106. func (d *Downloader) Stats() (current int, max int) {
  107. return d.queue.Size()
  108. }
  109. // Synchronising returns the state of the downloader
  110. func (d *Downloader) Synchronising() bool {
  111. return atomic.LoadInt32(&d.synchronising) > 0
  112. }
  113. // RegisterPeer injects a new download peer into the set of block source to be
  114. // used for fetching hashes and blocks from.
  115. func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error {
  116. glog.V(logger.Detail).Infoln("Registering peer", id)
  117. if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil {
  118. glog.V(logger.Error).Infoln("Register failed:", err)
  119. return err
  120. }
  121. return nil
  122. }
  123. // UnregisterPeer remove a peer from the known list, preventing any action from
  124. // the specified peer.
  125. func (d *Downloader) UnregisterPeer(id string) error {
  126. glog.V(logger.Detail).Infoln("Unregistering peer", id)
  127. if err := d.peers.Unregister(id); err != nil {
  128. glog.V(logger.Error).Infoln("Unregister failed:", err)
  129. return err
  130. }
  131. return nil
  132. }
  133. // Synchronise will select the peer and use it for synchronising. If an empty string is given
  134. // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
  135. // checks fail an error will be returned. This method is synchronous
  136. func (d *Downloader) Synchronise(id string, hash common.Hash) error {
  137. // Make sure only one goroutine is ever allowed past this point at once
  138. if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
  139. return ErrBusy
  140. }
  141. defer atomic.StoreInt32(&d.synchronising, 0)
  142. // Post a user notification of the sync (only once per session)
  143. if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
  144. glog.V(logger.Info).Infoln("Block synchronisation started")
  145. }
  146. // Create cancel channel for aborting mid-flight
  147. d.cancelLock.Lock()
  148. d.cancelCh = make(chan struct{})
  149. d.cancelLock.Unlock()
  150. // Abort if the queue still contains some leftover data
  151. if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
  152. return ErrPendingQueue
  153. }
  154. // Reset the queue and peer set to clean any internal leftover state
  155. d.queue.Reset()
  156. d.peers.Reset()
  157. d.checks = make(map[common.Hash]*crossCheck)
  158. // Retrieve the origin peer and initiate the downloading process
  159. p := d.peers.Peer(id)
  160. if p == nil {
  161. return errUnknownPeer
  162. }
  163. return d.syncWithPeer(p, hash)
  164. }
  165. // TakeBlocks takes blocks from the queue and yields them to the caller.
  166. func (d *Downloader) TakeBlocks() []*Block {
  167. return d.queue.TakeBlocks()
  168. }
  169. func (d *Downloader) Has(hash common.Hash) bool {
  170. return d.queue.Has(hash)
  171. }
  172. // syncWithPeer starts a block synchronization based on the hash chain from the
  173. // specified peer and head hash.
  174. func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
  175. d.mux.Post(StartEvent{})
  176. defer func() {
  177. // reset on error
  178. if err != nil {
  179. d.Cancel()
  180. d.mux.Post(FailedEvent{err})
  181. } else {
  182. d.mux.Post(DoneEvent{})
  183. }
  184. }()
  185. glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id)
  186. if err = d.fetchHashes(p, hash); err != nil {
  187. return err
  188. }
  189. if err = d.fetchBlocks(); err != nil {
  190. return err
  191. }
  192. glog.V(logger.Debug).Infoln("Synchronization completed")
  193. return nil
  194. }
  195. // Cancel cancels all of the operations and resets the queue. It returns true
  196. // if the cancel operation was completed.
  197. func (d *Downloader) Cancel() bool {
  198. // If we're not syncing just return.
  199. hs, bs := d.queue.Size()
  200. if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
  201. return false
  202. }
  203. // Close the current cancel channel
  204. d.cancelLock.Lock()
  205. select {
  206. case <-d.cancelCh:
  207. // Channel was already closed
  208. default:
  209. close(d.cancelCh)
  210. }
  211. d.cancelLock.Unlock()
  212. // reset the queue
  213. d.queue.Reset()
  214. return true
  215. }
  216. // XXX Make synchronous
  217. func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
  218. glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
  219. start := time.Now()
  220. // Add the hash to the queue first, and start hash retrieval
  221. d.queue.Insert([]common.Hash{h})
  222. p.getHashes(h)
  223. var (
  224. active = p // active peer will help determine the current active peer
  225. head = common.Hash{} // common and last hash
  226. timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
  227. attempted = make(map[string]bool) // attempted peers will help with retries
  228. crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
  229. )
  230. defer crossTicker.Stop()
  231. attempted[p.id] = true
  232. for finished := false; !finished; {
  233. select {
  234. case <-d.cancelCh:
  235. return errCancelHashFetch
  236. case hashPack := <-d.hashCh:
  237. // Make sure the active peer is giving us the hashes
  238. if hashPack.peerId != active.id {
  239. glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
  240. break
  241. }
  242. timeout.Reset(hashTTL)
  243. // Make sure the peer actually gave something valid
  244. if len(hashPack.hashes) == 0 {
  245. glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id)
  246. return errEmptyHashSet
  247. }
  248. for _, hash := range hashPack.hashes {
  249. if d.banned.Has(hash) {
  250. glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain\n", active.id)
  251. return ErrInvalidChain
  252. }
  253. }
  254. // Determine if we're done fetching hashes (queue up all pending), and continue if not done
  255. done, index := false, 0
  256. for index, head = range hashPack.hashes {
  257. if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
  258. glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4])
  259. hashPack.hashes = hashPack.hashes[:index]
  260. done = true
  261. break
  262. }
  263. }
  264. // Insert all the new hashes, but only continue if got something useful
  265. inserts := d.queue.Insert(hashPack.hashes)
  266. if len(inserts) == 0 && !done {
  267. glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id)
  268. return ErrBadPeer
  269. }
  270. if !done {
  271. // Check that the peer is not stalling the sync
  272. if len(inserts) < MinHashFetch {
  273. return ErrStallingPeer
  274. }
  275. // Try and fetch a random block to verify the hash batch
  276. // Skip the last hash as the cross check races with the next hash fetch
  277. cross := rand.Intn(len(inserts) - 1)
  278. origin, parent := inserts[cross], inserts[cross+1]
  279. glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent)
  280. d.checks[origin] = &crossCheck{
  281. expire: time.Now().Add(blockTTL),
  282. parent: parent,
  283. }
  284. active.getBlocks([]common.Hash{origin})
  285. // Also fetch a fresh
  286. active.getHashes(head)
  287. continue
  288. }
  289. // We're done, allocate the download cache and proceed pulling the blocks
  290. offset := 0
  291. if block := d.getBlock(head); block != nil {
  292. offset = int(block.NumberU64() + 1)
  293. }
  294. d.queue.Alloc(offset)
  295. finished = true
  296. case blockPack := <-d.blockCh:
  297. // Cross check the block with the random verifications
  298. if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
  299. continue
  300. }
  301. block := blockPack.blocks[0]
  302. if check, ok := d.checks[block.Hash()]; ok {
  303. if block.ParentHash() != check.parent {
  304. return ErrCrossCheckFailed
  305. }
  306. delete(d.checks, block.Hash())
  307. }
  308. case <-crossTicker.C:
  309. // Iterate over all the cross checks and fail the hash chain if they're not verified
  310. for hash, check := range d.checks {
  311. if time.Now().After(check.expire) {
  312. glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
  313. return ErrCrossCheckFailed
  314. }
  315. }
  316. case <-timeout.C:
  317. glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
  318. var p *peer // p will be set if a peer can be found
  319. // Attempt to find a new peer by checking inclusion of peers best hash in our
  320. // already fetched hash list. This can't guarantee 100% correctness but does
  321. // a fair job. This is always either correct or false incorrect.
  322. for _, peer := range d.peers.AllPeers() {
  323. if d.queue.Has(peer.head) && !attempted[peer.id] {
  324. p = peer
  325. break
  326. }
  327. }
  328. // if all peers have been tried, abort the process entirely or if the hash is
  329. // the zero hash.
  330. if p == nil || (head == common.Hash{}) {
  331. return ErrTimeout
  332. }
  333. // set p to the active peer. this will invalidate any hashes that may be returned
  334. // by our previous (delayed) peer.
  335. active = p
  336. p.getHashes(head)
  337. glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
  338. }
  339. }
  340. glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
  341. return nil
  342. }
  343. // fetchBlocks iteratively downloads the entire schedules block-chain, taking
  344. // any available peers, reserving a chunk of blocks for each, wait for delivery
  345. // and periodically checking for timeouts.
  346. func (d *Downloader) fetchBlocks() error {
  347. glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
  348. start := time.Now()
  349. // default ticker for re-fetching blocks every now and then
  350. ticker := time.NewTicker(20 * time.Millisecond)
  351. out:
  352. for {
  353. select {
  354. case <-d.cancelCh:
  355. return errCancelBlockFetch
  356. case blockPack := <-d.blockCh:
  357. // Short circuit if it's a stale cross check
  358. if len(blockPack.blocks) == 1 {
  359. block := blockPack.blocks[0]
  360. if _, ok := d.checks[block.Hash()]; ok {
  361. delete(d.checks, block.Hash())
  362. continue
  363. }
  364. }
  365. // If the peer was previously banned and failed to deliver it's pack
  366. // in a reasonable time frame, ignore it's message.
  367. if peer := d.peers.Peer(blockPack.peerId); peer != nil {
  368. // Deliver the received chunk of blocks
  369. if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
  370. if err == ErrInvalidChain {
  371. // The hash chain is invalid (blocks are not ordered properly), abort
  372. return err
  373. }
  374. // Peer did deliver, but some blocks were off, penalize
  375. glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
  376. peer.Demote()
  377. break
  378. }
  379. if glog.V(logger.Debug) && len(blockPack.blocks) > 0 {
  380. glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId)
  381. }
  382. // Promote the peer and update it's idle state
  383. peer.Promote()
  384. peer.SetIdle()
  385. }
  386. case <-ticker.C:
  387. // Check for bad peers. Bad peers may indicate a peer not responding
  388. // to a `getBlocks` message. A timeout of 5 seconds is set. Peers
  389. // that badly or poorly behave are removed from the peer set (not banned).
  390. // Bad peers are excluded from the available peer set and therefor won't be
  391. // reused. XXX We could re-introduce peers after X time.
  392. badPeers := d.queue.Expire(blockTTL)
  393. for _, pid := range badPeers {
  394. // XXX We could make use of a reputation system here ranking peers
  395. // in their performance
  396. // 1) Time for them to respond;
  397. // 2) Measure their speed;
  398. // 3) Amount and availability.
  399. if peer := d.peers.Peer(pid); peer != nil {
  400. peer.Demote()
  401. }
  402. }
  403. // After removing bad peers make sure we actually have sufficient peer left to keep downloading
  404. if d.peers.Len() == 0 {
  405. return errNoPeers
  406. }
  407. // If there are unrequested hashes left start fetching
  408. // from the available peers.
  409. if d.queue.Pending() > 0 {
  410. // Throttle the download if block cache is full and waiting processing
  411. if d.queue.Throttle() {
  412. continue
  413. }
  414. // Send a download request to all idle peers, until throttled
  415. idlePeers := d.peers.IdlePeers()
  416. for _, peer := range idlePeers {
  417. // Short circuit if throttling activated since above
  418. if d.queue.Throttle() {
  419. break
  420. }
  421. // Get a possible chunk. If nil is returned no chunk
  422. // could be returned due to no hashes available.
  423. request := d.queue.Reserve(peer, MaxBlockFetch)
  424. if request == nil {
  425. continue
  426. }
  427. // Fetch the chunk and check for error. If the peer was somehow
  428. // already fetching a chunk due to a bug, it will be returned to
  429. // the queue
  430. if err := peer.Fetch(request); err != nil {
  431. glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id)
  432. d.queue.Cancel(request)
  433. }
  434. }
  435. // Make sure that we have peers available for fetching. If all peers have been tried
  436. // and all failed throw an error
  437. if d.queue.InFlight() == 0 {
  438. return errPeersUnavailable
  439. }
  440. } else if d.queue.InFlight() == 0 {
  441. // When there are no more queue and no more in flight, We can
  442. // safely assume we're done. Another part of the process will check
  443. // for parent errors and will re-request anything that's missing
  444. break out
  445. }
  446. }
  447. }
  448. glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
  449. return nil
  450. }
  451. // DeliverBlocks injects a new batch of blocks received from a remote node.
  452. // This is usually invoked through the BlocksMsg by the protocol handler.
  453. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
  454. // Make sure the downloader is active
  455. if atomic.LoadInt32(&d.synchronising) == 0 {
  456. return errNoSyncActive
  457. }
  458. // Deliver or abort if the sync is canceled while queuing
  459. d.cancelLock.RLock()
  460. cancel := d.cancelCh
  461. d.cancelLock.RUnlock()
  462. select {
  463. case d.blockCh <- blockPack{id, blocks}:
  464. return nil
  465. case <-cancel:
  466. return errNoSyncActive
  467. }
  468. }
  469. // DeliverHashes injects a new batch of hashes received from a remote node into
  470. // the download schedule. This is usually invoked through the BlockHashesMsg by
  471. // the protocol handler.
  472. func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
  473. // Make sure the downloader is active
  474. if atomic.LoadInt32(&d.synchronising) == 0 {
  475. return errNoSyncActive
  476. }
  477. // Deliver or abort if the sync is canceled while queuing
  478. d.cancelLock.RLock()
  479. cancel := d.cancelCh
  480. d.cancelLock.RUnlock()
  481. select {
  482. case d.hashCh <- hashPack{id, hashes}:
  483. return nil
  484. case <-cancel:
  485. return errNoSyncActive
  486. }
  487. }