block_pool.go 7.7 KB


  1. package eth
  2. import (
  3. "bytes"
  4. "container/list"
  5. "fmt"
  6. "math"
  7. "math/big"
  8. "sync"
  9. "time"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/ethereum/go-ethereum/ethutil"
  12. "github.com/ethereum/go-ethereum/logger"
  13. "github.com/ethereum/go-ethereum/wire"
  14. )
  15. var poollogger = logger.NewLogger("BPOOL")
  16. type block struct {
  17. from *Peer
  18. peer *Peer
  19. block *types.Block
  20. reqAt time.Time
  21. requested int
  22. }
  23. type BlockPool struct {
  24. mut sync.Mutex
  25. eth *Ethereum
  26. hashes [][]byte
  27. pool map[string]*block
  28. td *big.Int
  29. quit chan bool
  30. fetchingHashes bool
  31. downloadStartedAt time.Time
  32. ChainLength, BlocksProcessed int
  33. peer *Peer
  34. }
  35. func NewBlockPool(eth *Ethereum) *BlockPool {
  36. return &BlockPool{
  37. eth: eth,
  38. pool: make(map[string]*block),
  39. td: ethutil.Big0,
  40. quit: make(chan bool),
  41. }
  42. }
  43. func (self *BlockPool) Len() int {
  44. return len(self.hashes)
  45. }
  46. func (self *BlockPool) Reset() {
  47. self.pool = make(map[string]*block)
  48. self.hashes = nil
  49. }
  50. func (self *BlockPool) HasLatestHash() bool {
  51. self.mut.Lock()
  52. defer self.mut.Unlock()
  53. return self.pool[string(self.eth.ChainManager().CurrentBlock.Hash())] != nil
  54. }
  55. func (self *BlockPool) HasCommonHash(hash []byte) bool {
  56. return self.eth.ChainManager().GetBlock(hash) != nil
  57. }
  58. func (self *BlockPool) Blocks() (blocks types.Blocks) {
  59. for _, item := range self.pool {
  60. if item.block != nil {
  61. blocks = append(blocks, item.block)
  62. }
  63. }
  64. return
  65. }
  66. func (self *BlockPool) FetchHashes(peer *Peer) bool {
  67. highestTd := self.eth.HighestTDPeer()
  68. if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer {
  69. if self.peer != peer {
  70. poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td)
  71. if self.peer != nil {
  72. self.peer.doneFetchingHashes = true
  73. }
  74. }
  75. self.peer = peer
  76. self.td = peer.td
  77. if !self.HasLatestHash() {
  78. peer.doneFetchingHashes = false
  79. const amount = 256
  80. peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4])
  81. peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)}))
  82. }
  83. return true
  84. }
  85. return false
  86. }
  87. func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
  88. self.mut.Lock()
  89. defer self.mut.Unlock()
  90. if self.pool[string(hash)] == nil {
  91. self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0}
  92. self.hashes = append([][]byte{hash}, self.hashes...)
  93. }
  94. }
  95. func (self *BlockPool) Add(b *types.Block, peer *Peer) {
  96. self.addBlock(b, peer, false)
  97. }
  98. func (self *BlockPool) AddNew(b *types.Block, peer *Peer) {
  99. self.addBlock(b, peer, true)
  100. }
  101. func (self *BlockPool) addBlock(b *types.Block, peer *Peer, newBlock bool) {
  102. self.mut.Lock()
  103. defer self.mut.Unlock()
  104. hash := string(b.Hash())
  105. if self.pool[hash] == nil && !self.eth.ChainManager().HasBlock(b.Hash()) {
  106. poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4])
  107. self.hashes = append(self.hashes, b.Hash())
  108. self.pool[hash] = &block{peer, peer, b, time.Now(), 0}
  109. // The following is only performed on an unrequested new block
  110. if newBlock {
  111. fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4]))
  112. fmt.Println("2.", self.pool[string(b.PrevHash)] == nil)
  113. fmt.Println("3.", !self.fetchingHashes)
  114. if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
  115. poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4])
  116. peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
  117. }
  118. }
  119. } else if self.pool[hash] != nil {
  120. self.pool[hash].block = b
  121. }
  122. self.BlocksProcessed++
  123. }
  124. func (self *BlockPool) Remove(hash []byte) {
  125. self.mut.Lock()
  126. defer self.mut.Unlock()
  127. self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
  128. delete(self.pool, string(hash))
  129. }
  130. func (self *BlockPool) DistributeHashes() {
  131. self.mut.Lock()
  132. defer self.mut.Unlock()
  133. var (
  134. peerLen = self.eth.peers.Len()
  135. amount = 256 * peerLen
  136. dist = make(map[*Peer][][]byte)
  137. )
  138. num := int(math.Min(float64(amount), float64(len(self.pool))))
  139. for i, j := 0, 0; i < len(self.hashes) && j < num; i++ {
  140. hash := self.hashes[i]
  141. item := self.pool[string(hash)]
  142. if item != nil && item.block == nil {
  143. var peer *Peer
  144. lastFetchFailed := time.Since(item.reqAt) > 5*time.Second
  145. // Handle failed requests
  146. if lastFetchFailed && item.requested > 5 && item.peer != nil {
  147. if item.requested < 100 {
  148. // Select peer the hash was retrieved off
  149. peer = item.from
  150. } else {
  151. // Remove it
  152. self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
  153. delete(self.pool, string(hash))
  154. }
  155. } else if lastFetchFailed || item.peer == nil {
  156. // Find a suitable, available peer
  157. eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
  158. if peer == nil && len(dist[p]) < amount/peerLen && p.statusKnown {
  159. peer = p
  160. }
  161. })
  162. }
  163. if peer != nil {
  164. item.reqAt = time.Now()
  165. item.peer = peer
  166. item.requested++
  167. dist[peer] = append(dist[peer], hash)
  168. }
  169. }
  170. }
  171. for peer, hashes := range dist {
  172. peer.FetchBlocks(hashes)
  173. }
  174. if len(dist) > 0 {
  175. self.downloadStartedAt = time.Now()
  176. }
  177. }
  178. func (self *BlockPool) Start() {
  179. go self.downloadThread()
  180. go self.chainThread()
  181. }
  182. func (self *BlockPool) Stop() {
  183. close(self.quit)
  184. }
  185. func (self *BlockPool) downloadThread() {
  186. serviceTimer := time.NewTicker(100 * time.Millisecond)
  187. out:
  188. for {
  189. select {
  190. case <-self.quit:
  191. break out
  192. case <-serviceTimer.C:
  193. // Check if we're catching up. If not distribute the hashes to
  194. // the peers and download the blockchain
  195. self.fetchingHashes = false
  196. eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
  197. if p.statusKnown && p.FetchingHashes() {
  198. self.fetchingHashes = true
  199. }
  200. })
  201. if len(self.hashes) > 0 {
  202. self.DistributeHashes()
  203. }
  204. if self.ChainLength < len(self.hashes) {
  205. self.ChainLength = len(self.hashes)
  206. }
  207. /*
  208. if !self.fetchingHashes {
  209. blocks := self.Blocks()
  210. chain.BlockBy(chain.Number).Sort(blocks)
  211. if len(blocks) > 0 {
  212. if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
  213. }
  214. }
  215. }
  216. */
  217. }
  218. }
  219. }
  220. func (self *BlockPool) chainThread() {
  221. procTimer := time.NewTicker(500 * time.Millisecond)
  222. out:
  223. for {
  224. select {
  225. case <-self.quit:
  226. break out
  227. case <-procTimer.C:
  228. blocks := self.Blocks()
  229. types.BlockBy(types.Number).Sort(blocks)
  230. // Find common block
  231. for i, block := range blocks {
  232. if self.eth.ChainManager().HasBlock(block.PrevHash) {
  233. blocks = blocks[i:]
  234. break
  235. }
  236. }
  237. if len(blocks) > 0 {
  238. if self.eth.ChainManager().HasBlock(blocks[0].PrevHash) {
  239. for i, block := range blocks[1:] {
  240. // NOTE: The Ith element in this loop refers to the previous block in
  241. // outer "blocks"
  242. if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 {
  243. blocks = blocks[:i]
  244. break
  245. }
  246. }
  247. } else {
  248. blocks = nil
  249. }
  250. }
  251. if len(blocks) > 0 {
  252. chainman := self.eth.ChainManager()
  253. err := chainman.InsertChain(blocks)
  254. if err != nil {
  255. poollogger.Debugln(err)
  256. self.Reset()
  257. if self.peer != nil && self.peer.conn != nil {
  258. poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr())
  259. }
  260. // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished.
  261. self.eth.BlacklistPeer(self.peer)
  262. self.peer.StopWithReason(DiscBadPeer)
  263. self.td = ethutil.Big0
  264. self.peer = nil
  265. }
  266. for _, block := range blocks {
  267. self.Remove(block.Hash())
  268. }
  269. }
  270. }
  271. }
  272. }