瀏覽代碼

Block pool is thread safe

obscuren 11 年之前
父節點
當前提交
ea0357bf02
共有 3 個文件被更改,包括 69 次插入53 次删除
  1. 58 43
      block_pool.go
  2. 6 4
      ethchain/state_manager.go
  3. 5 6
      peer.go

+ 58 - 43
block_pool.go

@@ -1,7 +1,6 @@
 package eth
 
 import (
-	"bytes"
 	"container/list"
 	"math"
 	"math/big"
@@ -35,6 +34,9 @@ type BlockPool struct {
 	td   *big.Int
 	quit chan bool
 
+	fetchingHashes    bool
+	downloadStartedAt time.Time
+
 	ChainLength, BlocksProcessed int
 }
 
@@ -52,6 +54,9 @@ func (self *BlockPool) Len() int {
 }
 
 func (self *BlockPool) HasLatestHash() bool {
+	self.mut.Lock()
+	defer self.mut.Unlock()
+
 	return self.pool[string(self.eth.BlockChain().CurrentBlock.Hash())] != nil
 }
 
@@ -59,7 +64,20 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool {
 	return self.eth.BlockChain().GetBlock(hash) != nil
 }
 
+func (self *BlockPool) Blocks() (blocks ethchain.Blocks) {
+	for _, item := range self.pool {
+		if item.block != nil {
+			blocks = append(blocks, item.block)
+		}
+	}
+
+	return
+}
+
 func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
+	self.mut.Lock()
+	defer self.mut.Unlock()
+
 	if self.pool[string(hash)] == nil {
 		self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0}
 
@@ -67,7 +85,10 @@ func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
 	}
 }
 
-func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) {
+func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) {
+	self.mut.Lock()
+	defer self.mut.Unlock()
+
 	hash := string(b.Hash())
 
 	if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) {
@@ -76,7 +97,7 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) {
 		self.hashPool = append(self.hashPool, b.Hash())
 		self.pool[hash] = &block{peer, peer, b, time.Now(), 0}
 
-		if !self.eth.BlockChain().HasBlock(b.PrevHash) {
+		if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes {
 			poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4])
 			peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)}))
 		}
@@ -87,36 +108,12 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) {
 	self.BlocksProcessed++
 }
 
-func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block {
-	for _, item := range self.pool {
-		if item.block != nil {
-			if bytes.Compare(item.block.Hash(), block.PrevHash) == 0 {
-				return item.block
-			}
-		}
-	}
-
-	return nil
-}
-
-func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks {
-	var blocks ethchain.Blocks
-
-	for b := block; b != nil; b = self.getParent(b) {
-		blocks = append(ethchain.Blocks{b}, blocks...)
-	}
-
-	return blocks
-}
-
-func (self *BlockPool) Blocks() (blocks ethchain.Blocks) {
-	for _, item := range self.pool {
-		if item.block != nil {
-			blocks = append(blocks, item.block)
-		}
-	}
+func (self *BlockPool) Remove(hash []byte) {
+	self.mut.Lock()
+	defer self.mut.Unlock()
 
-	return
+	self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash)
+	delete(self.pool, string(hash))
 }
 
 func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmount int) {
@@ -129,9 +126,7 @@ func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmou
 
 			f(block)
 
-			hash := block.Hash()
-			self.hashPool = ethutil.DeleteFromByteSlice(self.hashPool, hash)
-			delete(self.pool, string(hash))
+			self.Remove(block.Hash())
 		}
 
 	}
@@ -140,9 +135,12 @@ func (self *BlockPool) ProcessCanonical(f func(block *ethchain.Block)) (procAmou
 }
 
 func (self *BlockPool) DistributeHashes() {
+	self.mut.Lock()
+	defer self.mut.Unlock()
+
 	var (
 		peerLen = self.eth.peers.Len()
-		amount  = 200 * peerLen
+		amount  = 256 * peerLen
 		dist    = make(map[*Peer][][]byte)
 	)
 
@@ -156,7 +154,7 @@ func (self *BlockPool) DistributeHashes() {
 			lastFetchFailed := time.Since(item.reqAt) > 5*time.Second
 
 			// Handle failed requests
-			if lastFetchFailed && item.requested > 0 && item.peer != nil {
+			if lastFetchFailed && item.requested > 5 && item.peer != nil {
 				if item.requested < 100 {
 					// Select peer the hash was retrieved off
 					peer = item.from
@@ -187,19 +185,23 @@ func (self *BlockPool) DistributeHashes() {
 	for peer, hashes := range dist {
 		peer.FetchBlocks(hashes)
 	}
+
+	if len(dist) > 0 {
+		self.downloadStartedAt = time.Now()
+	}
 }
 
 func (self *BlockPool) Start() {
-	go self.update()
+	go self.downloadThread()
+	go self.chainThread()
 }
 
 func (self *BlockPool) Stop() {
 	close(self.quit)
 }
 
-func (self *BlockPool) update() {
+func (self *BlockPool) downloadThread() {
 	serviceTimer := time.NewTicker(100 * time.Millisecond)
-	procTimer := time.NewTicker(500 * time.Millisecond)
 out:
 	for {
 		select {
@@ -208,20 +210,31 @@ out:
 		case <-serviceTimer.C:
 			// Check if we're catching up. If not distribute the hashes to
 			// the peers and download the blockchain
-			done := true
+			self.fetchingHashes = false
 			eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
 				if p.statusKnown && p.FetchingHashes() {
-					done = false
+					self.fetchingHashes = true
 				}
 			})
 
-			if done && len(self.hashPool) > 0 {
+			if !self.fetchingHashes && len(self.hashPool) > 0 {
 				self.DistributeHashes()
 			}
 
 			if self.ChainLength < len(self.hashPool) {
 				self.ChainLength = len(self.hashPool)
 			}
+		}
+	}
+}
+
+func (self *BlockPool) chainThread() {
+	procTimer := time.NewTicker(500 * time.Millisecond)
+out:
+	for {
+		select {
+		case <-self.quit:
+			break out
 		case <-procTimer.C:
 			// XXX We can optimize this lifting this on to a new goroutine.
 			// We'd need to make sure that the pools are properly protected by a mutex
@@ -230,6 +243,8 @@ out:
 				err := self.eth.StateManager().Process(block, false)
 				if err != nil {
 					poollogger.Infoln(err)
+					poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
+					poollogger.Debugln(block)
 				}
 			})
 

+ 6 - 4
ethchain/state_manager.go

@@ -143,9 +143,6 @@ done:
 			}
 		}
 
-		// Notify all subscribers
-		self.Ethereum.Reactor().Post("newTx:post", tx)
-
 		// Update the state with pending changes
 		state.Update()
 
@@ -160,10 +157,15 @@ done:
 					os.Exit(1)
 				}
 
-				return nil, nil, nil, fmt.Errorf("err diff #%d (r) %v ~ %x  <=>  (c) %v ~ %x (%x)\n", i+1, original.CumulativeGasUsed, original.PostState[0:4], receipt.CumulativeGasUsed, receipt.PostState[0:4], receipt.Tx.Hash())
+				err := fmt.Errorf("#%d receipt failed (r) %v ~ %x  <=>  (c) %v ~ %x (%x...)", i+1, original.CumulativeGasUsed, original.PostState[0:4], receipt.CumulativeGasUsed, receipt.PostState[0:4], receipt.Tx.Hash()[0:4])
+
+				return nil, nil, nil, err
 			}
 		}
 
+		// Notify all subscribers
+		self.Ethereum.Reactor().Post("newTx:post", tx)
+
 		receipts = append(receipts, receipt)
 		handled = append(handled, tx)
 

+ 5 - 6
peer.go

@@ -503,16 +503,15 @@ func (p *Peer) HandleInbound() {
 					it := msg.Data.NewIterator()
 					for it.Next() {
 						hash := it.Value().Bytes()
-
-						p.lastReceivedHash = hash
-						p.LastHashReceived = time.Now()
-
 						if blockPool.HasCommonHash(hash) {
 							foundCommonHash = true
 
 							break
 						}
 
+						p.lastReceivedHash = hash
+						p.LastHashReceived = time.Now()
+
 						blockPool.AddHash(hash, p)
 					}
 
@@ -530,7 +529,7 @@ func (p *Peer) HandleInbound() {
 						block := ethchain.NewBlockFromRlpValue(it.Value())
 						//fmt.Printf("%v %x - %x\n", block.Number, block.Hash()[0:4], block.PrevHash[0:4])
 
-						blockPool.SetBlock(block, p)
+						blockPool.Add(block, p)
 
 						p.lastBlockReceived = time.Now()
 					}
@@ -561,7 +560,7 @@ func (self *Peer) FetchHashes() {
 }
 
 func (self *Peer) FetchingHashes() bool {
-	return time.Since(self.LastHashReceived) < 5*time.Second
+	return time.Since(self.LastHashReceived) < 200*time.Millisecond
 }
 
 // General update method