Browse Source

eth: skip transaction handling during fast sync

Péter Szilágyi 9 years ago
parent
commit
d87f7a1e81
3 changed files with 23 additions and 14 deletions
  1. 15 8
      eth/handler.go
  2. 4 3
      eth/sync.go
  3. 4 3
      eth/sync_test.go

+ 15 - 8
eth/handler.go

@@ -22,6 +22,7 @@ import (
 	"math"
 	"math"
 	"math/big"
 	"math/big"
 	"sync"
 	"sync"
+	"sync/atomic"
 	"time"
 	"time"
 
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/common"
@@ -58,7 +59,7 @@ type blockFetcherFn func([]common.Hash) error
 type ProtocolManager struct {
 type ProtocolManager struct {
 	networkId int
 	networkId int
 
 
-	fastSync   bool
+	fastSync   uint32
 	txpool     txPool
 	txpool     txPool
 	blockchain *core.BlockChain
 	blockchain *core.BlockChain
 	chaindb    ethdb.Database
 	chaindb    ethdb.Database
@@ -87,15 +88,9 @@ type ProtocolManager struct {
 // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
 // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
 // with the ethereum network.
 // with the ethereum network.
 func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
 func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
-	// Figure out whether to allow fast sync or not
-	if fastSync && blockchain.CurrentBlock().NumberU64() > 0 {
-		glog.V(logger.Info).Infof("blockchain not empty, fast sync disabled")
-		fastSync = false
-	}
 	// Create the protocol manager with the base fields
 	// Create the protocol manager with the base fields
 	manager := &ProtocolManager{
 	manager := &ProtocolManager{
 		networkId:   networkId,
 		networkId:   networkId,
-		fastSync:    fastSync,
 		eventMux:    mux,
 		eventMux:    mux,
 		txpool:      txpool,
 		txpool:      txpool,
 		blockchain:  blockchain,
 		blockchain:  blockchain,
@@ -106,6 +101,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
 		txsyncCh:    make(chan *txsync),
 		txsyncCh:    make(chan *txsync),
 		quitSync:    make(chan struct{}),
 		quitSync:    make(chan struct{}),
 	}
 	}
+	// Figure out whether to allow fast sync or not
+	if fastSync && blockchain.CurrentBlock().NumberU64() > 0 {
+		glog.V(logger.Info).Infof("blockchain not empty, fast sync disabled")
+		fastSync = false
+	}
+	if fastSync {
+		manager.fastSync = uint32(1)
+	}
 	// Initiate a sub-protocol for every implemented version we can handle
 	// Initiate a sub-protocol for every implemented version we can handle
 	manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
 	manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
 	for i, version := range ProtocolVersions {
 	for i, version := range ProtocolVersions {
@@ -678,7 +681,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		}
 
 
 	case msg.Code == TxMsg:
 	case msg.Code == TxMsg:
-		// Transactions arrived, parse all of them and deliver to the pool
+		// Transactions arrived, make sure we have a valid chain to handle them
+		if atomic.LoadUint32(&pm.fastSync) == 1 {
+			break
+		}
+		// Transactions can be processed, parse all of them and deliver to the pool
 		var txs []*types.Transaction
 		var txs []*types.Transaction
 		if err := msg.Decode(&txs); err != nil {
 		if err := msg.Decode(&txs); err != nil {
 			return errResp(ErrDecode, "msg %v: %v", msg, err)
 			return errResp(ErrDecode, "msg %v: %v", msg, err)

+ 4 - 3
eth/sync.go

@@ -18,6 +18,7 @@ package eth
 
 
 import (
 import (
 	"math/rand"
 	"math/rand"
+	"sync/atomic"
 	"time"
 	"time"
 
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/common"
@@ -167,18 +168,18 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
 	}
 	}
 	// Otherwise try to sync with the downloader
 	// Otherwise try to sync with the downloader
 	mode := downloader.FullSync
 	mode := downloader.FullSync
-	if pm.fastSync {
+	if atomic.LoadUint32(&pm.fastSync) == 1 {
 		mode = downloader.FastSync
 		mode = downloader.FastSync
 	}
 	}
 	if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil {
 	if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil {
 		return
 		return
 	}
 	}
 	// If fast sync was enabled, and we synced up, disable it
 	// If fast sync was enabled, and we synced up, disable it
-	if pm.fastSync {
+	if atomic.LoadUint32(&pm.fastSync) == 1 {
 		// Disable fast sync if we indeed have something in our chain
 		// Disable fast sync if we indeed have something in our chain
 		if pm.blockchain.CurrentBlock().NumberU64() > 0 {
 		if pm.blockchain.CurrentBlock().NumberU64() > 0 {
 			glog.V(logger.Info).Infof("fast sync complete, auto disabling")
 			glog.V(logger.Info).Infof("fast sync complete, auto disabling")
-			pm.fastSync = false
+			atomic.StoreUint32(&pm.fastSync, 0)
 		}
 		}
 	}
 	}
 }
 }

+ 4 - 3
eth/sync_test.go

@@ -17,6 +17,7 @@
 package eth
 package eth
 
 
 import (
 import (
+	"sync/atomic"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -29,12 +30,12 @@ import (
 func TestFastSyncDisabling(t *testing.T) {
 func TestFastSyncDisabling(t *testing.T) {
 	// Create a pristine protocol manager, check that fast sync is left enabled
 	// Create a pristine protocol manager, check that fast sync is left enabled
 	pmEmpty := newTestProtocolManagerMust(t, true, 0, nil, nil)
 	pmEmpty := newTestProtocolManagerMust(t, true, 0, nil, nil)
-	if !pmEmpty.fastSync {
+	if atomic.LoadUint32(&pmEmpty.fastSync) == 0 {
 		t.Fatalf("fast sync disabled on pristine blockchain")
 		t.Fatalf("fast sync disabled on pristine blockchain")
 	}
 	}
 	// Create a full protocol manager, check that fast sync gets disabled
 	// Create a full protocol manager, check that fast sync gets disabled
 	pmFull := newTestProtocolManagerMust(t, true, 1024, nil, nil)
 	pmFull := newTestProtocolManagerMust(t, true, 1024, nil, nil)
-	if pmFull.fastSync {
+	if atomic.LoadUint32(&pmFull.fastSync) == 1 {
 		t.Fatalf("fast sync not disabled on non-empty blockchain")
 		t.Fatalf("fast sync not disabled on non-empty blockchain")
 	}
 	}
 	// Sync up the two peers
 	// Sync up the two peers
@@ -47,7 +48,7 @@ func TestFastSyncDisabling(t *testing.T) {
 	pmEmpty.synchronise(pmEmpty.peers.BestPeer())
 	pmEmpty.synchronise(pmEmpty.peers.BestPeer())
 
 
 	// Check that fast sync was disabled
 	// Check that fast sync was disabled
-	if pmEmpty.fastSync {
+	if atomic.LoadUint32(&pmEmpty.fastSync) == 1 {
 		t.Fatalf("fast sync not disabled after successful synchronisation")
 		t.Fatalf("fast sync not disabled after successful synchronisation")
 	}
 	}
 }
 }