queue.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package downloader
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/ethereum/go-ethereum/common"
  8. "github.com/ethereum/go-ethereum/core/types"
  9. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  10. )
  11. const (
  12. blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
  13. )
  14. // fetchRequest is a currently running block retrieval operation.
  15. type fetchRequest struct {
  16. Peer *peer // Peer to which the request was sent
  17. Hashes map[common.Hash]int // Requested hashes with their insertion index (priority)
  18. Time time.Time // Time when the request was made
  19. }
  20. // queue represents hashes that are either need fetching or are being fetched
  21. type queue struct {
  22. hashPool map[common.Hash]int // Pending hashes, mapping to their insertion index (priority)
  23. hashQueue *prque.Prque // Priority queue of the block hashes to fetch
  24. hashCounter int // Counter indexing the added hashes to ensure retrieval order
  25. pendPool map[string]*fetchRequest // Currently pending block retrieval operations
  26. blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
  27. blockCache []*types.Block // Downloaded but not yet delivered blocks
  28. blockOffset int // Offset of the first cached block in the block-chain
  29. lock sync.RWMutex
  30. }
  31. // newQueue creates a new download queue for scheduling block retrieval.
  32. func newQueue() *queue {
  33. return &queue{
  34. hashPool: make(map[common.Hash]int),
  35. hashQueue: prque.New(),
  36. pendPool: make(map[string]*fetchRequest),
  37. blockPool: make(map[common.Hash]int),
  38. }
  39. }
  40. // Reset clears out the queue contents.
  41. func (q *queue) Reset() {
  42. q.lock.Lock()
  43. defer q.lock.Unlock()
  44. q.hashPool = make(map[common.Hash]int)
  45. q.hashQueue.Reset()
  46. q.hashCounter = 0
  47. q.pendPool = make(map[string]*fetchRequest)
  48. q.blockPool = make(map[common.Hash]int)
  49. q.blockOffset = 0
  50. q.blockCache = nil
  51. }
  52. // Size retrieves the number of hashes in the queue, returning separately for
  53. // pending and already downloaded.
  54. func (q *queue) Size() (int, int) {
  55. q.lock.RLock()
  56. defer q.lock.RUnlock()
  57. return len(q.hashPool), len(q.blockPool)
  58. }
  59. // Pending retrieves the number of hashes pending for retrieval.
  60. func (q *queue) Pending() int {
  61. q.lock.RLock()
  62. defer q.lock.RUnlock()
  63. return q.hashQueue.Size()
  64. }
  65. // InFlight retrieves the number of fetch requests currently in flight.
  66. func (q *queue) InFlight() int {
  67. q.lock.RLock()
  68. defer q.lock.RUnlock()
  69. return len(q.pendPool)
  70. }
  71. // Throttle checks if the download should be throttled (active block fetches
  72. // exceed block cache).
  73. func (q *queue) Throttle() bool {
  74. q.lock.RLock()
  75. defer q.lock.RUnlock()
  76. // Calculate the currently in-flight block requests
  77. pending := 0
  78. for _, request := range q.pendPool {
  79. pending += len(request.Hashes)
  80. }
  81. // Throttle if more blocks are in-flight than free space in the cache
  82. return pending >= len(q.blockCache)-len(q.blockPool)
  83. }
  84. // Has checks if a hash is within the download queue or not.
  85. func (q *queue) Has(hash common.Hash) bool {
  86. q.lock.RLock()
  87. defer q.lock.RUnlock()
  88. if _, ok := q.hashPool[hash]; ok {
  89. return true
  90. }
  91. if _, ok := q.blockPool[hash]; ok {
  92. return true
  93. }
  94. return false
  95. }
  96. // Insert adds a set of hashes for the download queue for scheduling.
  97. func (q *queue) Insert(hashes []common.Hash) {
  98. q.lock.Lock()
  99. defer q.lock.Unlock()
  100. // Insert all the hashes prioritized in the arrival order
  101. for i, hash := range hashes {
  102. index := q.hashCounter + i
  103. q.hashPool[hash] = index
  104. q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
  105. }
  106. // Update the hash counter for the next batch of inserts
  107. q.hashCounter += len(hashes)
  108. }
  109. // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
  110. // been downloaded yet (or simply non existent).
  111. func (q *queue) GetHeadBlock() *types.Block {
  112. q.lock.RLock()
  113. defer q.lock.RUnlock()
  114. if len(q.blockCache) == 0 {
  115. return nil
  116. }
  117. return q.blockCache[0]
  118. }
  119. // GetBlock retrieves a downloaded block, or nil if non-existent.
  120. func (q *queue) GetBlock(hash common.Hash) *types.Block {
  121. q.lock.RLock()
  122. defer q.lock.RUnlock()
  123. // Short circuit if the block hasn't been downloaded yet
  124. index, ok := q.blockPool[hash]
  125. if !ok {
  126. return nil
  127. }
  128. // Return the block if it's still available in the cache
  129. if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) {
  130. return q.blockCache[index-q.blockOffset]
  131. }
  132. return nil
  133. }
  134. // TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
  135. // The head parameter is required to prevent a race condition where concurrent
  136. // takes may fail parent verifications.
  137. func (q *queue) TakeBlocks(head *types.Block) types.Blocks {
  138. q.lock.Lock()
  139. defer q.lock.Unlock()
  140. // Short circuit if the head block's different
  141. if len(q.blockCache) == 0 || q.blockCache[0] != head {
  142. return nil
  143. }
  144. // Otherwise accumulate all available blocks
  145. var blocks types.Blocks
  146. for _, block := range q.blockCache {
  147. if block == nil {
  148. break
  149. }
  150. blocks = append(blocks, block)
  151. delete(q.blockPool, block.Hash())
  152. }
  153. // Delete the blocks from the slice and let them be garbage collected
  154. // without this slice trick the blocks would stay in memory until nil
  155. // would be assigned to q.blocks
  156. copy(q.blockCache, q.blockCache[len(blocks):])
  157. for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ {
  158. q.blockCache[k] = nil
  159. }
  160. q.blockOffset += len(blocks)
  161. return blocks
  162. }
  163. // Reserve reserves a set of hashes for the given peer, skipping any previously
  164. // failed download.
  165. func (q *queue) Reserve(p *peer, max int) *fetchRequest {
  166. q.lock.Lock()
  167. defer q.lock.Unlock()
  168. // Short circuit if the pool has been depleted, or if the peer's already
  169. // downloading something (sanity check not to corrupt state)
  170. if q.hashQueue.Empty() {
  171. return nil
  172. }
  173. if _, ok := q.pendPool[p.id]; ok {
  174. return nil
  175. }
  176. // Retrieve a batch of hashes, skipping previously failed ones
  177. send := make(map[common.Hash]int)
  178. skip := make(map[common.Hash]int)
  179. for len(send) < max && !q.hashQueue.Empty() {
  180. hash, priority := q.hashQueue.Pop()
  181. if p.ignored.Has(hash) {
  182. skip[hash.(common.Hash)] = int(priority)
  183. } else {
  184. send[hash.(common.Hash)] = int(priority)
  185. }
  186. }
  187. // Merge all the skipped hashes back
  188. for hash, index := range skip {
  189. q.hashQueue.Push(hash, float32(index))
  190. }
  191. // Assemble and return the block download request
  192. if len(send) == 0 {
  193. return nil
  194. }
  195. request := &fetchRequest{
  196. Peer: p,
  197. Hashes: send,
  198. Time: time.Now(),
  199. }
  200. q.pendPool[p.id] = request
  201. return request
  202. }
  203. // Cancel aborts a fetch request, returning all pending hashes to the queue.
  204. func (q *queue) Cancel(request *fetchRequest) {
  205. q.lock.Lock()
  206. defer q.lock.Unlock()
  207. for hash, index := range request.Hashes {
  208. q.hashQueue.Push(hash, float32(index))
  209. }
  210. delete(q.pendPool, request.Peer.id)
  211. }
  212. // Expire checks for in flight requests that exceeded a timeout allowance,
  213. // canceling them and returning the responsible peers for penalization.
  214. func (q *queue) Expire(timeout time.Duration) []string {
  215. q.lock.Lock()
  216. defer q.lock.Unlock()
  217. // Iterate over the expired requests and return each to the queue
  218. peers := []string{}
  219. for id, request := range q.pendPool {
  220. if time.Since(request.Time) > timeout {
  221. for hash, index := range request.Hashes {
  222. q.hashQueue.Push(hash, float32(index))
  223. }
  224. peers = append(peers, id)
  225. }
  226. }
  227. // Remove the expired requests from the pending pool
  228. for _, id := range peers {
  229. delete(q.pendPool, id)
  230. }
  231. return peers
  232. }
  233. // Deliver injects a block retrieval response into the download queue.
  234. func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
  235. q.lock.Lock()
  236. defer q.lock.Unlock()
  237. // Short circuit if the blocks were never requested
  238. request := q.pendPool[id]
  239. if request == nil {
  240. return errors.New("no fetches pending")
  241. }
  242. delete(q.pendPool, id)
  243. // If no blocks were retrieved, mark them as unavailable for the origin peer
  244. if len(blocks) == 0 {
  245. for hash, _ := range request.Hashes {
  246. request.Peer.ignored.Add(hash)
  247. }
  248. }
  249. // Iterate over the downloaded blocks and add each of them
  250. errs := make([]error, 0)
  251. for _, block := range blocks {
  252. // Skip any blocks that fall outside the cache range
  253. index := int(block.NumberU64()) - q.blockOffset
  254. if index >= len(q.blockCache) || index < 0 {
  255. //fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache))
  256. continue
  257. }
  258. // Skip any blocks that were not requested
  259. hash := block.Hash()
  260. if _, ok := request.Hashes[hash]; !ok {
  261. errs = append(errs, fmt.Errorf("non-requested block %v", hash))
  262. continue
  263. }
  264. // Otherwise merge the block and mark the hash block
  265. q.blockCache[index] = block
  266. delete(request.Hashes, hash)
  267. delete(q.hashPool, hash)
  268. q.blockPool[hash] = int(block.NumberU64())
  269. }
  270. // Return all failed fetches to the queue
  271. for hash, index := range request.Hashes {
  272. q.hashQueue.Push(hash, float32(index))
  273. }
  274. if len(errs) != 0 {
  275. return fmt.Errorf("multiple failures: %v", errs)
  276. }
  277. return nil
  278. }
  279. // Alloc ensures that the block cache is the correct size, given a starting
  280. // offset, and a memory cap.
  281. func (q *queue) Alloc(offset int) {
  282. q.lock.Lock()
  283. defer q.lock.Unlock()
  284. if q.blockOffset < offset {
  285. q.blockOffset = offset
  286. }
  287. size := len(q.hashPool)
  288. if size > blockCacheLimit {
  289. size = blockCacheLimit
  290. }
  291. if len(q.blockCache) < size {
  292. q.blockCache = append(q.blockCache, make([]*types.Block, size-len(q.blockCache))...)
  293. }
  294. }