obscuren 11 years ago
parent
commit
ba43364f36
3 changed files with 154 additions and 45 deletions
  1. 95 6
      block_pool.go
  2. 0 1
      ethereum.go
  3. 59 38
      peer.go

+ 95 - 6
block_pool.go

@@ -1,17 +1,23 @@
 package eth
 
 import (
+	"bytes"
+	"container/list"
+	"fmt"
 	"math"
 	"math/big"
 	"sync"
+	"time"
 
 	"github.com/ethereum/eth-go/ethchain"
 	"github.com/ethereum/eth-go/ethutil"
 )
 
 type block struct {
-	peer  *Peer
-	block *ethchain.Block
+	peer      *Peer
+	block     *ethchain.Block
+	reqAt     time.Time
+	requested int
 }
 
 type BlockPool struct {
@@ -22,7 +28,8 @@ type BlockPool struct {
 	hashPool [][]byte
 	pool     map[string]*block
 
-	td *big.Int
+	td   *big.Int
+	quit chan bool
 }
 
 func NewBlockPool(eth *Ethereum) *BlockPool {
@@ -30,6 +37,7 @@ func NewBlockPool(eth *Ethereum) *BlockPool {
 		eth:  eth,
 		pool: make(map[string]*block),
 		td:   ethutil.Big0,
+		quit: make(chan bool),
 	}
 }
 
@@ -47,7 +55,7 @@ func (self *BlockPool) HasCommonHash(hash []byte) bool {
 
 func (self *BlockPool) AddHash(hash []byte) {
 	if self.pool[string(hash)] == nil {
-		self.pool[string(hash)] = &block{nil, nil}
+		self.pool[string(hash)] = &block{nil, nil, time.Now(), 0}
 
 		self.hashPool = append([][]byte{hash}, self.hashPool...)
 	}
@@ -58,12 +66,34 @@ func (self *BlockPool) SetBlock(b *ethchain.Block, peer *Peer) {
 
 	if self.pool[hash] == nil && !self.eth.BlockChain().HasBlock(b.Hash()) {
 		self.hashPool = append(self.hashPool, b.Hash())
-		self.pool[hash] = &block{peer, b}
+		self.pool[hash] = &block{peer, b, time.Now(), 0}
 	} else if self.pool[hash] != nil {
 		self.pool[hash].block = b
 	}
 }
 
+func (self *BlockPool) getParent(block *ethchain.Block) *ethchain.Block {
+	for _, item := range self.pool {
+		if item.block != nil {
+			if bytes.Compare(item.block.Hash(), block.PrevHash) == 0 {
+				return item.block
+			}
+		}
+	}
+
+	return nil
+}
+
+func (self *BlockPool) GetChainFromBlock(block *ethchain.Block) ethchain.Blocks {
+	var blocks ethchain.Blocks
+
+	for b := block; b != nil; b = self.getParent(b) {
+		blocks = append(ethchain.Blocks{b}, blocks...)
+	}
+
+	return blocks
+}
+
 func (self *BlockPool) CheckLinkAndProcess(f func(block *ethchain.Block)) {
 
 	var blocks ethchain.Blocks
@@ -94,8 +124,14 @@ func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) {
 	j := 0
 	for i := 0; i < len(self.hashPool) && j < num; i++ {
 		hash := string(self.hashPool[i])
-		if self.pool[hash] != nil && (self.pool[hash].peer == nil || self.pool[hash].peer == peer) && self.pool[hash].block == nil {
+		item := self.pool[hash]
+		if item != nil && item.block == nil &&
+			(item.peer == nil ||
+				((time.Since(item.reqAt) > 5*time.Second && item.peer != peer) && self.eth.peers.Len() > 1) || // multiple peers
+				(time.Since(item.reqAt) > 5*time.Second && self.eth.peers.Len() == 1) /* single peer*/) {
 			self.pool[hash].peer = peer
+			self.pool[hash].reqAt = time.Now()
+			self.pool[hash].requested++
 
 			hashes = append(hashes, self.hashPool[i])
 			j++
@@ -104,3 +140,56 @@ func (self *BlockPool) Take(amount int, peer *Peer) (hashes [][]byte) {
 
 	return
 }
+
+func (self *BlockPool) Start() {
+	go self.update()
+}
+
+func (self *BlockPool) Stop() {
+	close(self.quit)
+}
+
+func (self *BlockPool) update() {
+	serviceTimer := time.NewTicker(100 * time.Millisecond)
+	procTimer := time.NewTicker(500 * time.Millisecond)
+out:
+	for {
+		select {
+		case <-self.quit:
+			break out
+		case <-serviceTimer.C:
+			// Clean up hashes that can't be fetched
+			done := true
+			eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
+				if p.statusKnown && p.FetchingHashes() {
+					done = false
+				}
+			})
+
+			if done {
+				eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
+					if p.statusKnown {
+						hashes := self.Take(100, p)
+						if len(hashes) > 0 {
+							p.FetchBlocks(hashes)
+							if len(hashes) == 1 {
+								fmt.Printf("last hash = %x\n", hashes[0])
+							} else {
+								fmt.Println("Requesting", len(hashes), "of", p)
+							}
+						}
+					}
+				})
+			}
+		case <-procTimer.C:
+			var err error
+			self.CheckLinkAndProcess(func(block *ethchain.Block) {
+				err = self.eth.StateManager().Process(block, false)
+			})
+
+			if err != nil {
+				peerlogger.Infoln(err)
+			}
+		}
+	}
+}

+ 0 - 1
ethereum.go

@@ -383,7 +383,6 @@ func (s *Ethereum) ReapDeadPeerHandler() {
 // Start the ethereum
 func (s *Ethereum) Start(seed bool) {
 	s.reactor.Start()
-	s.blockPool.Start()
 	// Bind to addr and port
 	ln, err := net.Listen("tcp", ":"+s.Port)
 	if err != nil {

+ 59 - 38
peer.go

@@ -131,6 +131,7 @@ type Peer struct {
 	// Last received pong message
 	lastPong          int64
 	lastBlockReceived time.Time
+	LastHashReceived  time.Time
 
 	host             []byte
 	port             uint16
@@ -176,6 +177,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
 		caps:            ethereum.ServerCaps(),
 		version:         ethereum.ClientIdentity().String(),
 		protocolCaps:    ethutil.NewValue(nil),
+		td:              big.NewInt(0),
 	}
 }
 
@@ -191,6 +193,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
 		caps:         caps,
 		version:      ethereum.ClientIdentity().String(),
 		protocolCaps: ethutil.NewValue(nil),
+		td:           big.NewInt(0),
 	}
 
 	// Set up the connection in another goroutine so we don't block the main thread
@@ -505,6 +508,9 @@ func (p *Peer) HandleInbound() {
 					for it.Next() {
 						hash := it.Value().Bytes()
 
+						p.lastReceivedHash = hash
+						p.LastHashReceived = time.Now()
+
 						if blockPool.HasCommonHash(hash) {
 							foundCommonHash = true
 
@@ -512,15 +518,16 @@ func (p *Peer) HandleInbound() {
 						}
 
 						blockPool.AddHash(hash)
-
-						p.lastReceivedHash = hash
-
-						p.lastBlockReceived = time.Now()
 					}
 
-					if foundCommonHash || msg.Data.Len() == 0 {
-						p.FetchBlocks()
-					} else {
+					/*
+						if foundCommonHash || msg.Data.Len() == 0 {
+							p.FetchBlocks()
+						} else {
+							p.FetchHashes()
+						}
+					*/
+					if !foundCommonHash && msg.Data.Len() != 0 {
 						p.FetchHashes()
 					}
 
@@ -539,19 +546,21 @@ func (p *Peer) HandleInbound() {
 						p.lastBlockReceived = time.Now()
 					}
 
-					var err error
-					blockPool.CheckLinkAndProcess(func(block *ethchain.Block) {
-						err = p.ethereum.StateManager().Process(block, false)
-					})
-
-					if err != nil {
-						peerlogger.Infoln(err)
-					} else {
-						// Don't trigger if there's just one block.
-						if blockPool.Len() != 0 && msg.Data.Len() > 1 {
-							p.FetchBlocks()
+					/*
+						var err error
+						blockPool.CheckLinkAndProcess(func(block *ethchain.Block) {
+							err = p.ethereum.StateManager().Process(block, false)
+						})
+
+						if err != nil {
+							peerlogger.Infoln(err)
+						} else {
+							// Don't trigger if there's just one block.
+							if blockPool.Len() != 0 && msg.Data.Len() > 1 {
+								p.FetchBlocks()
+							}
 						}
-					}
+					*/
 				}
 			}
 		}
@@ -560,10 +569,7 @@ func (p *Peer) HandleInbound() {
 	p.Stop()
 }
 
-func (self *Peer) FetchBlocks() {
-	blockPool := self.ethereum.blockPool
-
-	hashes := blockPool.Take(100, self)
+func (self *Peer) FetchBlocks(hashes [][]byte) {
 	if len(hashes) > 0 {
 		self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes)))
 	}
@@ -572,7 +578,7 @@ func (self *Peer) FetchBlocks() {
 func (self *Peer) FetchHashes() {
 	blockPool := self.ethereum.blockPool
 
-	if self.td.Cmp(blockPool.td) >= 0 {
+	if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 {
 		blockPool.td = self.td
 
 		if !blockPool.HasLatestHash() {
@@ -581,6 +587,10 @@ func (self *Peer) FetchHashes() {
 	}
 }
 
+func (self *Peer) FetchingHashes() bool {
+	return time.Since(self.LastHashReceived) < 5*time.Second
+}
+
 // General update method
 func (self *Peer) update() {
 	serviceTimer := time.NewTicker(5 * time.Second)
@@ -589,11 +599,22 @@ out:
 	for {
 		select {
 		case <-serviceTimer.C:
-			since := time.Since(self.lastBlockReceived)
-			if since > 10*time.Second && self.ethereum.blockPool.Len() != 0 && self.IsCap("eth") {
-				self.FetchHashes()
-			} else if since > 5*time.Second {
-				self.catchingUp = false
+			if self.IsCap("eth") {
+				var (
+					sinceBlock = time.Since(self.lastBlockReceived)
+					sinceHash  = time.Since(self.LastHashReceived)
+				)
+
+				if sinceBlock > 5*time.Second && sinceHash > 5*time.Second {
+					self.catchingUp = false
+				}
+
+				if sinceHash > 10*time.Second && self.ethereum.blockPool.Len() != 0 {
+					// XXX While this is completely and utterly incorrect, in order to do anything on the test net is to do it this way
+					// Assume that when fetching hashes timeouts, we are done.
+					//self.FetchHashes()
+					//self.FetchBlocks()
+				}
 			}
 		case <-self.quit:
 			break out
@@ -761,6 +782,14 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
 		return
 	}
 
+	// Self connect detection
+	pubkey := p.ethereum.KeyManager().PublicKey()
+	if bytes.Compare(pubkey[1:], pub) == 0 {
+		p.Stop()
+
+		return
+	}
+
 	usedPub := 0
 	// This peer is already added to the peerlist so we expect to find a double pubkey at least once
 	eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {
@@ -779,16 +808,8 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
 	// If this is an inbound connection send an ack back
 	if p.inbound {
 		p.port = uint16(port)
-
-		// Self connect detection
-		pubkey := p.ethereum.KeyManager().PublicKey()
-		if bytes.Compare(pubkey, p.pubkey) == 0 {
-			p.Stop()
-
-			return
-		}
-
 	}
+
 	p.SetVersion(clientId)
 
 	p.versionKnown = true