Browse Source

cmd/bootnode, eth, p2p, p2p/discover: use a fancier db design

Péter Szilágyi 10 years ago
parent
commit
8646365b42

+ 1 - 1
cmd/bootnode/main.go

@@ -71,7 +71,7 @@ func main() {
 		}
 	}
 
-	if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, nil); err != nil {
+	if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, ""); err != nil {
 		log.Fatal(err)
 	}
 	select {}

+ 2 - 8
eth/backend.go

@@ -125,7 +125,6 @@ type Ethereum struct {
 	blockDb common.Database // Block chain database
 	stateDb common.Database // State changes database
 	extraDb common.Database // Extra database (txs, etc)
-	seedDb  *discover.Cache // Peer database seeding the bootstrap
 
 	// Closed when databases are flushed and closed
 	databasesClosed chan bool
@@ -181,10 +180,7 @@ func New(config *Config) (*Ethereum, error) {
 	if err != nil {
 		return nil, err
 	}
-	seedDb, err := discover.NewPersistentCache(path.Join(config.DataDir, "seeds"))
-	if err != nil {
-		return nil, err
-	}
+	nodeDb := path.Join(config.DataDir, "nodes")
 
 	// Perform database sanity checks
 	d, _ := blockDb.Get([]byte("ProtocolVersion"))
@@ -212,7 +208,6 @@ func New(config *Config) (*Ethereum, error) {
 		blockDb:         blockDb,
 		stateDb:         stateDb,
 		extraDb:         extraDb,
-		seedDb:          seedDb,
 		eventMux:        &event.TypeMux{},
 		accountManager:  config.AccountManager,
 		DataDir:         config.DataDir,
@@ -250,7 +245,7 @@ func New(config *Config) (*Ethereum, error) {
 		NAT:            config.NAT,
 		NoDial:         !config.Dial,
 		BootstrapNodes: config.parseBootNodes(),
-		SeedCache:      seedDb,
+		NodeDatabase:   nodeDb,
 	}
 	if len(config.Port) > 0 {
 		eth.net.ListenAddr = ":" + config.Port
@@ -429,7 +424,6 @@ done:
 		}
 	}
 
-	s.seedDb.Close()
 	s.blockDb.Close()
 	s.stateDb.Close()
 	s.extraDb.Close()

+ 0 - 134
p2p/discover/cache.go

@@ -1,134 +0,0 @@
-// Contains the discovery cache, storing previously seen nodes to act as seed
-// servers during bootstrapping the network.
-
-package discover
-
-import (
-	"bytes"
-	"encoding/binary"
-	"net"
-	"os"
-
-	"github.com/ethereum/go-ethereum/rlp"
-	"github.com/syndtr/goleveldb/leveldb"
-	"github.com/syndtr/goleveldb/leveldb/storage"
-)
-
-// Cache stores all nodes we know about.
-type Cache struct {
-	db *leveldb.DB
-}
-
-// Cache version to allow dumping old data if it changes.
-var cacheVersionKey = []byte("pv")
-
-// NewMemoryCache creates a new in-memory peer cache without a persistent backend.
-func NewMemoryCache() (*Cache, error) {
-	db, err := leveldb.Open(storage.NewMemStorage(), nil)
-	if err != nil {
-		return nil, err
-	}
-	return &Cache{db: db}, nil
-}
-
-// NewPersistentCache creates/opens a leveldb backed persistent peer cache, also
-// flushing its contents in case of a version mismatch.
-func NewPersistentCache(path string) (*Cache, error) {
-	// Try to open the cache, recovering any corruption
-	db, err := leveldb.OpenFile(path, nil)
-	if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted {
-		db, err = leveldb.RecoverFile(path, nil)
-	}
-	if err != nil {
-		return nil, err
-	}
-	// The nodes contained in the cache correspond to a certain protocol version.
-	// Flush all nodes if the version doesn't match.
-	currentVer := make([]byte, binary.MaxVarintLen64)
-	currentVer = currentVer[:binary.PutVarint(currentVer, Version)]
-
-	blob, err := db.Get(cacheVersionKey, nil)
-	switch err {
-	case leveldb.ErrNotFound:
-		// Version not found (i.e. empty cache), insert it
-		err = db.Put(cacheVersionKey, currentVer, nil)
-
-	case nil:
-		// Version present, flush if different
-		if !bytes.Equal(blob, currentVer) {
-			db.Close()
-			if err = os.RemoveAll(path); err != nil {
-				return nil, err
-			}
-			return NewPersistentCache(path)
-		}
-	}
-	// Clean up in case of an error
-	if err != nil {
-		db.Close()
-		return nil, err
-	}
-	return &Cache{db: db}, nil
-}
-
-// get retrieves a node with a given id from the seed da
-func (c *Cache) get(id NodeID) *Node {
-	blob, err := c.db.Get(id[:], nil)
-	if err != nil {
-		return nil
-	}
-	node := new(Node)
-	if err := rlp.DecodeBytes(blob, node); err != nil {
-		return nil
-	}
-	return node
-}
-
-// list retrieves a batch of nodes from the database.
-func (c *Cache) list(n int) []*Node {
-	it := c.db.NewIterator(nil, nil)
-	defer it.Release()
-
-	nodes := make([]*Node, 0, n)
-	for i := 0; i < n && it.Next(); i++ {
-		var id NodeID
-		copy(id[:], it.Key())
-
-		if node := c.get(id); node != nil {
-			nodes = append(nodes, node)
-		}
-	}
-	return nodes
-}
-
-// update inserts - potentially overwriting - a node in the seed database.
-func (c *Cache) update(node *Node) error {
-	blob, err := rlp.EncodeToBytes(node)
-	if err != nil {
-		return err
-	}
-	return c.db.Put(node.ID[:], blob, nil)
-}
-
-// add inserts a new node into the seed database.
-func (c *Cache) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node {
-	node := &Node{
-		ID:       id,
-		IP:       addr.IP,
-		DiscPort: addr.Port,
-		TCPPort:  int(tcpPort),
-	}
-	c.update(node)
-
-	return node
-}
-
-// delete removes a node from the database.
-func (c *Cache) delete(id NodeID) error {
-	return c.db.Delete(id[:], nil)
-}
-
-// Close flushes and closes the database files.
-func (c *Cache) Close() {
-	c.db.Close()
-}

+ 233 - 0
p2p/discover/database.go

@@ -0,0 +1,233 @@
+// Contains the node database, storing previously seen nodes and any collected
+// metadata about them for QoS purposes.
+
+package discover
+
+import (
+	"bytes"
+	"encoding/binary"
+	"os"
+	"time"
+
+	"github.com/ethereum/go-ethereum/rlp"
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+// nodeDB stores all nodes we know about.
+type nodeDB struct {
+	lvl *leveldb.DB
+}
+
+// Schema layout for the node database
+var (
+	nodeDBVersionKey = []byte("version") // Version of the database to flush if changes
+	nodeDBStartupKey = []byte("startup") // Time when the node discovery started (seed selection)
+	nodeDBItemPrefix = []byte("n:")      // Identifier to prefix node entries with
+
+	nodeDBDiscoverRoot = ":discover"
+	nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping"
+	nodeDBDiscoverBond = nodeDBDiscoverRoot + ":lastbond"
+)
+
+// newNodeDB creates a new node database for storing and retrieving infos about
+// known peers in the network. If no path is given, an in-memory, temporary
+// database is constructed.
+func newNodeDB(path string) (*nodeDB, error) {
+	if path == "" {
+		return newMemoryNodeDB()
+	}
+	return newPersistentNodeDB(path)
+}
+
+// newMemoryNodeDB creates a new in-memory node database without a persistent
+// backend.
+func newMemoryNodeDB() (*nodeDB, error) {
+	db, err := leveldb.Open(storage.NewMemStorage(), nil)
+	if err != nil {
+		return nil, err
+	}
+	return &nodeDB{lvl: db}, nil
+}
+
+// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
+// also flushing its contents in case of a version mismatch.
+func newPersistentNodeDB(path string) (*nodeDB, error) {
+	// Try to open the cache, recovering any corruption
+	db, err := leveldb.OpenFile(path, nil)
+	if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted {
+		db, err = leveldb.RecoverFile(path, nil)
+	}
+	if err != nil {
+		return nil, err
+	}
+	// The nodes contained in the cache correspond to a certain protocol version.
+	// Flush all nodes if the version doesn't match.
+	currentVer := make([]byte, binary.MaxVarintLen64)
+	currentVer = currentVer[:binary.PutVarint(currentVer, Version)]
+
+	blob, err := db.Get(nodeDBVersionKey, nil)
+	switch err {
+	case leveldb.ErrNotFound:
+		// Version not found (i.e. empty cache), insert it
+		err = db.Put(nodeDBVersionKey, currentVer, nil)
+
+	case nil:
+		// Version present, flush if different
+		if !bytes.Equal(blob, currentVer) {
+			db.Close()
+			if err = os.RemoveAll(path); err != nil {
+				return nil, err
+			}
+			return newPersistentNodeDB(path)
+		}
+	}
+	// Clean up in case of an error
+	if err != nil {
+		db.Close()
+		return nil, err
+	}
+	return &nodeDB{lvl: db}, nil
+}
+
+// key generates the leveldb key-blob from a node id and its particular field of
+// interest.
+func (db *nodeDB) key(id NodeID, field string) []byte {
+	return append(nodeDBItemPrefix, append(id[:], field...)...)
+}
+
+// splitKey tries to split a database key into a node id and a field part.
+func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) {
+	// If the key is not of a node, return it plainly
+	if !bytes.HasPrefix(key, nodeDBItemPrefix) {
+		return NodeID{}, string(key)
+	}
+	// Otherwise split the id and field
+	item := key[len(nodeDBItemPrefix):]
+	copy(id[:], item[:len(id)])
+	field = string(item[len(id):])
+
+	return id, field
+}
+
+// fetchTime retrieves a time instance (encoded as a unix timestamp) associated
+// with a particular database key.
+func (db *nodeDB) fetchTime(key []byte) time.Time {
+	blob, err := db.lvl.Get(key, nil)
+	if err != nil {
+		return time.Time{}
+	}
+	var unix int64
+	if err := rlp.DecodeBytes(blob, &unix); err != nil {
+		return time.Time{}
+	}
+	return time.Unix(unix, 0)
+}
+
+// storeTime update a specific database entry to the current time instance as a
+// unix timestamp.
+func (db *nodeDB) storeTime(key []byte, instance time.Time) error {
+	blob, err := rlp.EncodeToBytes(instance.Unix())
+	if err != nil {
+		return err
+	}
+	return db.lvl.Put(key, blob, nil)
+}
+
+// startup retrieves the time instance when the bootstrapping last begun. Its
+// purpose is to prevent contacting potential seed nodes multiple times in the
+// same boot cycle.
+func (db *nodeDB) startup() time.Time {
+	return db.fetchTime(nodeDBStartupKey)
+}
+
+// updateStartup updates the bootstrap initiation time to the one specified.
+func (db *nodeDB) updateStartup(instance time.Time) error {
+	return db.storeTime(nodeDBStartupKey, instance)
+}
+
+// node retrieves a node with a given id from the database.
+func (db *nodeDB) node(id NodeID) *Node {
+	blob, err := db.lvl.Get(db.key(id, nodeDBDiscoverRoot), nil)
+	if err != nil {
+		return nil
+	}
+	node := new(Node)
+	if err := rlp.DecodeBytes(blob, node); err != nil {
+		return nil
+	}
+	return node
+}
+
+// updateNode inserts - potentially overwriting - a node into the peer database.
+func (db *nodeDB) updateNode(node *Node) error {
+	blob, err := rlp.EncodeToBytes(node)
+	if err != nil {
+		return err
+	}
+	return db.lvl.Put(db.key(node.ID, nodeDBDiscoverRoot), blob, nil)
+}
+
+// lastPing retrieves the time of the last ping packet send to a remote node,
+// requesting binding.
+func (db *nodeDB) lastPing(id NodeID) time.Time {
+	return db.fetchTime(db.key(id, nodeDBDiscoverPing))
+}
+
+// updateLastPing updates the last time we tried contacting a remote node.
+func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error {
+	return db.storeTime(db.key(id, nodeDBDiscoverPing), instance)
+}
+
+// lastBond retrieves the time of the last successful bonding with a remote node.
+func (db *nodeDB) lastBond(id NodeID) time.Time {
+	return db.fetchTime(db.key(id, nodeDBDiscoverBond))
+}
+
+// updateLastBond updates the last time we successfully bound to a remote node.
+func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error {
+	return db.storeTime(db.key(id, nodeDBDiscoverBond), instance)
+}
+
+// querySeeds retrieves a batch of nodes to be used as potential seed servers
+// during bootstrapping the node into the network.
+//
+// Ideal seeds are the most recently seen nodes (highest probability to be still
+// alive), but yet untried. However, since leveldb only supports dumb iteration
+// we will instead start pulling in potential seeds that haven't been yet pinged
+// since the start of the boot procedure.
+//
+// If the database runs out of potential seeds, we restart the startup counter
+// and start iterating over the peers again.
+func (db *nodeDB) querySeeds(n int) []*Node {
+	startup := db.startup()
+
+	it := db.lvl.NewIterator(nil, nil)
+	defer it.Release()
+
+	nodes := make([]*Node, 0, n)
+	for len(nodes) < n && it.Next() {
+		// Iterate until a discovery node is found
+		id, field := db.splitKey(it.Key())
+		if field != nodeDBDiscoverRoot {
+			continue
+		}
+		// Retrieve the last ping time, and if older than startup, query
+		lastPing := db.lastPing(id)
+		if lastPing.Before(startup) {
+			if node := db.node(id); node != nil {
+				nodes = append(nodes, node)
+			}
+		}
+	}
+	// Reset the startup time if no seeds were found
+	if len(nodes) == 0 {
+		db.updateStartup(time.Now())
+	}
+	return nodes
+}
+
+// close flushes and closes the database files.
+func (db *nodeDB) close() {
+	db.lvl.Close()
+}

+ 25 - 11
p2p/discover/table.go

@@ -27,7 +27,7 @@ type Table struct {
 	mutex   sync.Mutex        // protects buckets, their content, and nursery
 	buckets [nBuckets]*bucket // index of known nodes by distance
 	nursery []*Node           // bootstrap nodes
-	cache   *Cache            // cache of known nodes
+	db      *nodeDB           // database of known nodes
 
 	bondmu    sync.Mutex
 	bonding   map[NodeID]*bondproc
@@ -61,15 +61,17 @@ type bucket struct {
 	entries    []*Node
 }
 
-func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seeder *Cache) *Table {
+func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table {
 	// If no seed cache was given, use an in-memory one
-	if seeder == nil {
-		seeder, _ = NewMemoryCache()
+	db, err := newNodeDB(nodeDBPath)
+	if err != nil {
+		glog.V(logger.Warn).Infoln("Failed to open node database:", err)
+		db, _ = newNodeDB("")
 	}
 	// Create the bootstrap table
 	tab := &Table{
 		net:       t,
-		cache:     seeder,
+		db:        db,
 		self:      newNode(ourID, ourAddr),
 		bonding:   make(map[NodeID]*bondproc),
 		bondslots: make(chan struct{}, maxBondingPingPongs),
@@ -91,6 +93,7 @@ func (tab *Table) Self() *Node {
 // Close terminates the network listener and flushes the seed cache.
 func (tab *Table) Close() {
 	tab.net.close()
+	tab.db.close()
 }
 
 // Bootstrap sets the bootstrap nodes. These nodes are used to connect
@@ -174,11 +177,10 @@ func (tab *Table) refresh() {
 
 	result := tab.Lookup(randomID(tab.self.ID, ld))
 	if len(result) == 0 {
-		// Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live)
-		seeds := tab.cache.list(10)
+		// Pick a batch of previously know seeds to lookup with
+		seeds := tab.db.querySeeds(10)
 		for _, seed := range seeds {
-			glog.V(logger.Debug).Infoln("Seeding network with:", seed)
-			tab.cache.delete(seed.ID)
+			glog.V(logger.Debug).Infoln("Seeding network with", seed)
 		}
 		// Bootstrap the table with a self lookup
 		all := tab.bondall(append(tab.nursery, seeds...))
@@ -249,7 +251,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
 // of the process can be skipped.
 func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
 	var n *Node
-	if n = tab.cache.get(id); n == nil {
+	if n = tab.db.node(id); n == nil {
 		tab.bondmu.Lock()
 		w := tab.bonding[id]
 		if w != nil {
@@ -282,8 +284,12 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
 }
 
 func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
+	// Request a bonding slot to limit network usage
 	<-tab.bondslots
 	defer func() { tab.bondslots <- struct{}{} }()
+
+	// Ping the remote side and wait for a pong
+	tab.db.updateLastPing(id, time.Now())
 	if w.err = tab.net.ping(id, addr); w.err != nil {
 		close(w.done)
 		return
@@ -294,7 +300,15 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
 		// waitping will simply time out.
 		tab.net.waitping(id)
 	}
-	w.n = tab.cache.add(id, addr, tcpPort)
+	// Bonding succeeded, update the node database
+	w.n = &Node{
+		ID:       id,
+		IP:       addr.IP,
+		DiscPort: addr.Port,
+		TCPPort:  int(tcpPort),
+	}
+	tab.db.updateNode(w.n)
+	tab.db.updateLastBond(id, time.Now())
 	close(w.done)
 }
 

+ 3 - 3
p2p/discover/table_test.go

@@ -15,7 +15,7 @@ import (
 func TestTable_pingReplace(t *testing.T) {
 	doit := func(newNodeIsResponding, lastInBucketIsResponding bool) {
 		transport := newPingRecorder()
-		tab := newTable(transport, NodeID{}, &net.UDPAddr{}, nil)
+		tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "")
 		last := fillBucket(tab, 200)
 		pingSender := randomID(tab.self.ID, 200)
 
@@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) {
 
 	test := func(test *closeTest) bool {
 		// for any node table, Target and N
-		tab := newTable(nil, test.Self, &net.UDPAddr{}, nil)
+		tab := newTable(nil, test.Self, &net.UDPAddr{}, "")
 		tab.add(test.All)
 
 		// check that doClosest(Target, N) returns nodes
@@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) {
 	self := gen(NodeID{}, quickrand).(NodeID)
 	target := randomID(self, 200)
 	transport := findnodeOracle{t, target}
-	tab := newTable(transport, self, &net.UDPAddr{}, nil)
+	tab := newTable(transport, self, &net.UDPAddr{}, "")
 
 	// lookup on empty table returns no nodes
 	if results := tab.Lookup(target); len(results) > 0 {

+ 5 - 5
p2p/discover/udp.go

@@ -144,7 +144,7 @@ type reply struct {
 }
 
 // ListenUDP returns a new table that listens for UDP packets on laddr.
-func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder *Cache) (*Table, error) {
+func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string) (*Table, error) {
 	addr, err := net.ResolveUDPAddr("udp", laddr)
 	if err != nil {
 		return nil, err
@@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder
 	if err != nil {
 		return nil, err
 	}
-	tab, _ := newUDP(priv, conn, natm, seeder)
+	tab, _ := newUDP(priv, conn, natm, nodeDBPath)
 	glog.V(logger.Info).Infoln("Listening,", tab.self)
 	return tab, nil
 }
 
-func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (*Table, *udp) {
+func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string) (*Table, *udp) {
 	udp := &udp{
 		conn:       c,
 		priv:       priv,
@@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (
 			realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
 		}
 	}
-	udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seeder)
+	udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
 	go udp.loop()
 	go udp.readLoop()
 	return udp.Table, udp
@@ -449,7 +449,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
 	if expired(req.Expiration) {
 		return errExpired
 	}
-	if t.cache.get(fromID) == nil {
+	if t.db.node(fromID) == nil {
 		// No bond exists, we don't process the packet. This prevents
 		// an attack vector where the discovery protocol could be used
 		// to amplify traffic in a DDOS attack. A malicious actor

+ 7 - 3
p2p/discover/udp_test.go

@@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest {
 		remotekey:  newkey(),
 		remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303},
 	}
-	test.table, test.udp = newUDP(test.localkey, test.pipe, nil, nil)
+	test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "")
 	return test
 }
 
@@ -157,8 +157,12 @@ func TestUDP_findnode(t *testing.T) {
 
 	// ensure there's a bond with the test node,
 	// findnode won't be accepted otherwise.
-	test.table.cache.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99)
-
+	test.table.db.updateNode(&Node{
+		ID:       PubkeyID(&test.remotekey.PublicKey),
+		IP:       test.remoteaddr.IP,
+		DiscPort: test.remoteaddr.Port,
+		TCPPort:  99,
+	})
 	// check that closest neighbors are returned.
 	test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})
 	test.waitPacketOut(func(p *neighbors) {

+ 4 - 4
p2p/server.go

@@ -59,9 +59,9 @@ type Server struct {
 	// with the rest of the network.
 	BootstrapNodes []*discover.Node
 
-	// SeedCache is the database containing the previously seen live nodes in
-	// the network to use as potential bootstrap seeds.
-	SeedCache *discover.Cache
+	// NodeDatabase is the path to the database containing the previously seen
+	// live nodes in the network.
+	NodeDatabase string
 
 	// Protocols should contain the protocols supported
 	// by the server. Matching protocols are launched for
@@ -201,7 +201,7 @@ func (srv *Server) Start() (err error) {
 	}
 
 	// node table
-	ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.SeedCache)
+	ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
 	if err != nil {
 		return err
 	}