queue.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. // Contains the block download scheduler to collect download tasks and schedule
  2. // them in an ordered, and throttled way.
  3. package downloader
  4. import (
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "time"
  9. "github.com/ethereum/go-ethereum/common"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/ethereum/go-ethereum/logger"
  12. "github.com/ethereum/go-ethereum/logger/glog"
  13. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  14. )
  15. var (
  16. blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download
  17. )
  18. var (
  19. errNoFetchesPending = errors.New("no fetches pending")
  20. errStaleDelivery = errors.New("stale delivery")
  21. )
  22. // fetchRequest is a currently running block retrieval operation.
  23. type fetchRequest struct {
  24. Peer *peer // Peer to which the request was sent
  25. Hashes map[common.Hash]int // Requested hashes with their insertion index (priority)
  26. Time time.Time // Time when the request was made
  27. }
  28. // queue represents hashes that are either need fetching or are being fetched
  29. type queue struct {
  30. hashPool map[common.Hash]int // Pending hashes, mapping to their insertion index (priority)
  31. hashQueue *prque.Prque // Priority queue of the block hashes to fetch
  32. hashCounter int // Counter indexing the added hashes to ensure retrieval order
  33. pendPool map[string]*fetchRequest // Currently pending block retrieval operations
  34. blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
  35. blockCache []*Block // Downloaded but not yet delivered blocks
  36. blockOffset int // Offset of the first cached block in the block-chain
  37. lock sync.RWMutex
  38. }
  39. // newQueue creates a new download queue for scheduling block retrieval.
  40. func newQueue() *queue {
  41. return &queue{
  42. hashPool: make(map[common.Hash]int),
  43. hashQueue: prque.New(),
  44. pendPool: make(map[string]*fetchRequest),
  45. blockPool: make(map[common.Hash]int),
  46. blockCache: make([]*Block, blockCacheLimit),
  47. }
  48. }
  49. // Reset clears out the queue contents.
  50. func (q *queue) Reset() {
  51. q.lock.Lock()
  52. defer q.lock.Unlock()
  53. q.hashPool = make(map[common.Hash]int)
  54. q.hashQueue.Reset()
  55. q.hashCounter = 0
  56. q.pendPool = make(map[string]*fetchRequest)
  57. q.blockPool = make(map[common.Hash]int)
  58. q.blockOffset = 0
  59. q.blockCache = make([]*Block, blockCacheLimit)
  60. }
  61. // Size retrieves the number of hashes in the queue, returning separately for
  62. // pending and already downloaded.
  63. func (q *queue) Size() (int, int) {
  64. q.lock.RLock()
  65. defer q.lock.RUnlock()
  66. return len(q.hashPool), len(q.blockPool)
  67. }
  68. // Pending retrieves the number of hashes pending for retrieval.
  69. func (q *queue) Pending() int {
  70. q.lock.RLock()
  71. defer q.lock.RUnlock()
  72. return q.hashQueue.Size()
  73. }
  74. // InFlight retrieves the number of fetch requests currently in flight.
  75. func (q *queue) InFlight() int {
  76. q.lock.RLock()
  77. defer q.lock.RUnlock()
  78. return len(q.pendPool)
  79. }
  80. // Throttle checks if the download should be throttled (active block fetches
  81. // exceed block cache).
  82. func (q *queue) Throttle() bool {
  83. q.lock.RLock()
  84. defer q.lock.RUnlock()
  85. // Calculate the currently in-flight block requests
  86. pending := 0
  87. for _, request := range q.pendPool {
  88. pending += len(request.Hashes)
  89. }
  90. // Throttle if more blocks are in-flight than free space in the cache
  91. return pending >= len(q.blockCache)-len(q.blockPool)
  92. }
  93. // Has checks if a hash is within the download queue or not.
  94. func (q *queue) Has(hash common.Hash) bool {
  95. q.lock.RLock()
  96. defer q.lock.RUnlock()
  97. if _, ok := q.hashPool[hash]; ok {
  98. return true
  99. }
  100. if _, ok := q.blockPool[hash]; ok {
  101. return true
  102. }
  103. return false
  104. }
  105. // Insert adds a set of hashes for the download queue for scheduling, returning
  106. // the new hashes encountered.
  107. func (q *queue) Insert(hashes []common.Hash) []common.Hash {
  108. q.lock.Lock()
  109. defer q.lock.Unlock()
  110. // Insert all the hashes prioritized in the arrival order
  111. inserts := make([]common.Hash, 0, len(hashes))
  112. for _, hash := range hashes {
  113. // Skip anything we already have
  114. if old, ok := q.hashPool[hash]; ok {
  115. glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
  116. continue
  117. }
  118. // Update the counters and insert the hash
  119. q.hashCounter = q.hashCounter + 1
  120. inserts = append(inserts, hash)
  121. q.hashPool[hash] = q.hashCounter
  122. q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
  123. }
  124. return inserts
  125. }
  126. // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
  127. // been downloaded yet (or simply non existent).
  128. func (q *queue) GetHeadBlock() *Block {
  129. q.lock.RLock()
  130. defer q.lock.RUnlock()
  131. if len(q.blockCache) == 0 {
  132. return nil
  133. }
  134. return q.blockCache[0]
  135. }
  136. // GetBlock retrieves a downloaded block, or nil if non-existent.
  137. func (q *queue) GetBlock(hash common.Hash) *Block {
  138. q.lock.RLock()
  139. defer q.lock.RUnlock()
  140. // Short circuit if the block hasn't been downloaded yet
  141. index, ok := q.blockPool[hash]
  142. if !ok {
  143. return nil
  144. }
  145. // Return the block if it's still available in the cache
  146. if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) {
  147. return q.blockCache[index-q.blockOffset]
  148. }
  149. return nil
  150. }
  151. // TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
  152. func (q *queue) TakeBlocks() []*Block {
  153. q.lock.Lock()
  154. defer q.lock.Unlock()
  155. // Accumulate all available blocks
  156. blocks := []*Block{}
  157. for _, block := range q.blockCache {
  158. if block == nil {
  159. break
  160. }
  161. blocks = append(blocks, block)
  162. delete(q.blockPool, block.RawBlock.Hash())
  163. }
  164. // Delete the blocks from the slice and let them be garbage collected
  165. // without this slice trick the blocks would stay in memory until nil
  166. // would be assigned to q.blocks
  167. copy(q.blockCache, q.blockCache[len(blocks):])
  168. for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ {
  169. q.blockCache[k] = nil
  170. }
  171. q.blockOffset += len(blocks)
  172. return blocks
  173. }
  174. // Reserve reserves a set of hashes for the given peer, skipping any previously
  175. // failed download.
  176. func (q *queue) Reserve(p *peer, count int) *fetchRequest {
  177. q.lock.Lock()
  178. defer q.lock.Unlock()
  179. // Short circuit if the pool has been depleted, or if the peer's already
  180. // downloading something (sanity check not to corrupt state)
  181. if q.hashQueue.Empty() {
  182. return nil
  183. }
  184. if _, ok := q.pendPool[p.id]; ok {
  185. return nil
  186. }
  187. // Calculate an upper limit on the hashes we might fetch (i.e. throttling)
  188. space := len(q.blockCache) - len(q.blockPool)
  189. for _, request := range q.pendPool {
  190. space -= len(request.Hashes)
  191. }
  192. // Retrieve a batch of hashes, skipping previously failed ones
  193. send := make(map[common.Hash]int)
  194. skip := make(map[common.Hash]int)
  195. for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ {
  196. hash, priority := q.hashQueue.Pop()
  197. if p.ignored.Has(hash) {
  198. skip[hash.(common.Hash)] = int(priority)
  199. } else {
  200. send[hash.(common.Hash)] = int(priority)
  201. }
  202. }
  203. // Merge all the skipped hashes back
  204. for hash, index := range skip {
  205. q.hashQueue.Push(hash, float32(index))
  206. }
  207. // Assemble and return the block download request
  208. if len(send) == 0 {
  209. return nil
  210. }
  211. request := &fetchRequest{
  212. Peer: p,
  213. Hashes: send,
  214. Time: time.Now(),
  215. }
  216. q.pendPool[p.id] = request
  217. return request
  218. }
  219. // Cancel aborts a fetch request, returning all pending hashes to the queue.
  220. func (q *queue) Cancel(request *fetchRequest) {
  221. q.lock.Lock()
  222. defer q.lock.Unlock()
  223. for hash, index := range request.Hashes {
  224. q.hashQueue.Push(hash, float32(index))
  225. }
  226. delete(q.pendPool, request.Peer.id)
  227. }
  228. // Expire checks for in flight requests that exceeded a timeout allowance,
  229. // canceling them and returning the responsible peers for penalization.
  230. func (q *queue) Expire(timeout time.Duration) []string {
  231. q.lock.Lock()
  232. defer q.lock.Unlock()
  233. // Iterate over the expired requests and return each to the queue
  234. peers := []string{}
  235. for id, request := range q.pendPool {
  236. if time.Since(request.Time) > timeout {
  237. for hash, index := range request.Hashes {
  238. q.hashQueue.Push(hash, float32(index))
  239. }
  240. peers = append(peers, id)
  241. }
  242. }
  243. // Remove the expired requests from the pending pool
  244. for _, id := range peers {
  245. delete(q.pendPool, id)
  246. }
  247. return peers
  248. }
  249. // Deliver injects a block retrieval response into the download queue.
  250. func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
  251. q.lock.Lock()
  252. defer q.lock.Unlock()
  253. // Short circuit if the blocks were never requested
  254. request := q.pendPool[id]
  255. if request == nil {
  256. return errNoFetchesPending
  257. }
  258. delete(q.pendPool, id)
  259. // If no blocks were retrieved, mark them as unavailable for the origin peer
  260. if len(blocks) == 0 {
  261. for hash, _ := range request.Hashes {
  262. request.Peer.ignored.Add(hash)
  263. }
  264. }
  265. // Iterate over the downloaded blocks and add each of them
  266. errs := make([]error, 0)
  267. for _, block := range blocks {
  268. // Skip any blocks that were not requested
  269. hash := block.Hash()
  270. if _, ok := request.Hashes[hash]; !ok {
  271. errs = append(errs, fmt.Errorf("non-requested block %x", hash))
  272. continue
  273. }
  274. // If a requested block falls out of the range, the hash chain is invalid
  275. index := int(block.NumberU64()) - q.blockOffset
  276. if index >= len(q.blockCache) || index < 0 {
  277. return errInvalidChain
  278. }
  279. // Otherwise merge the block and mark the hash block
  280. q.blockCache[index] = &Block{
  281. RawBlock: block,
  282. OriginPeer: id,
  283. }
  284. delete(request.Hashes, hash)
  285. delete(q.hashPool, hash)
  286. q.blockPool[hash] = int(block.NumberU64())
  287. }
  288. // Return all failed or missing fetches to the queue
  289. for hash, index := range request.Hashes {
  290. q.hashQueue.Push(hash, float32(index))
  291. }
  292. // If none of the blocks were good, it's a stale delivery
  293. if len(errs) != 0 {
  294. if len(errs) == len(blocks) {
  295. return errStaleDelivery
  296. }
  297. return fmt.Errorf("multiple failures: %v", errs)
  298. }
  299. return nil
  300. }
  301. // Prepare configures the block cache offset to allow accepting inbound blocks.
  302. func (q *queue) Prepare(offset int) {
  303. q.lock.Lock()
  304. defer q.lock.Unlock()
  305. if q.blockOffset < offset {
  306. q.blockOffset = offset
  307. }
  308. }