nodedb.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package enode
  17. import (
  18. "bytes"
  19. "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "os"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. "github.com/syndtr/goleveldb/leveldb"
  28. "github.com/syndtr/goleveldb/leveldb/errors"
  29. "github.com/syndtr/goleveldb/leveldb/iterator"
  30. "github.com/syndtr/goleveldb/leveldb/opt"
  31. "github.com/syndtr/goleveldb/leveldb/storage"
  32. "github.com/syndtr/goleveldb/leveldb/util"
  33. )
  34. var (
  35. nodeDBNilID = ID{} // Special node ID to use as a nil element.
  36. nodeDBNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
  37. nodeDBCleanupCycle = time.Hour // Time period for running the expiration task.
  38. nodeDBVersion = 6
  39. )
  40. // DB is the node database, storing previously seen nodes and any collected metadata about
  41. // them for QoS purposes.
  42. type DB struct {
  43. lvl *leveldb.DB // Interface to the database itself
  44. runner sync.Once // Ensures we can start at most one expirer
  45. quit chan struct{} // Channel to signal the expiring thread to stop
  46. }
  47. // Schema layout for the node database
  48. var (
  49. nodeDBVersionKey = []byte("version") // Version of the database to flush if changes
  50. nodeDBItemPrefix = []byte("n:") // Identifier to prefix node entries with
  51. nodeDBDiscoverRoot = ":discover"
  52. nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping"
  53. nodeDBDiscoverPong = nodeDBDiscoverRoot + ":lastpong"
  54. nodeDBDiscoverFindFails = nodeDBDiscoverRoot + ":findfail"
  55. )
  56. // OpenDB opens a node database for storing and retrieving infos about known peers in the
  57. // network. If no path is given an in-memory, temporary database is constructed.
  58. func OpenDB(path string) (*DB, error) {
  59. if path == "" {
  60. return newMemoryDB()
  61. }
  62. return newPersistentDB(path)
  63. }
  64. // newMemoryNodeDB creates a new in-memory node database without a persistent backend.
  65. func newMemoryDB() (*DB, error) {
  66. db, err := leveldb.Open(storage.NewMemStorage(), nil)
  67. if err != nil {
  68. return nil, err
  69. }
  70. return &DB{lvl: db, quit: make(chan struct{})}, nil
  71. }
  72. // newPersistentNodeDB creates/opens a leveldb backed persistent node database,
  73. // also flushing its contents in case of a version mismatch.
  74. func newPersistentDB(path string) (*DB, error) {
  75. opts := &opt.Options{OpenFilesCacheCapacity: 5}
  76. db, err := leveldb.OpenFile(path, opts)
  77. if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
  78. db, err = leveldb.RecoverFile(path, nil)
  79. }
  80. if err != nil {
  81. return nil, err
  82. }
  83. // The nodes contained in the cache correspond to a certain protocol version.
  84. // Flush all nodes if the version doesn't match.
  85. currentVer := make([]byte, binary.MaxVarintLen64)
  86. currentVer = currentVer[:binary.PutVarint(currentVer, int64(nodeDBVersion))]
  87. blob, err := db.Get(nodeDBVersionKey, nil)
  88. switch err {
  89. case leveldb.ErrNotFound:
  90. // Version not found (i.e. empty cache), insert it
  91. if err := db.Put(nodeDBVersionKey, currentVer, nil); err != nil {
  92. db.Close()
  93. return nil, err
  94. }
  95. case nil:
  96. // Version present, flush if different
  97. if !bytes.Equal(blob, currentVer) {
  98. db.Close()
  99. if err = os.RemoveAll(path); err != nil {
  100. return nil, err
  101. }
  102. return newPersistentDB(path)
  103. }
  104. }
  105. return &DB{lvl: db, quit: make(chan struct{})}, nil
  106. }
  107. // makeKey generates the leveldb key-blob from a node id and its particular
  108. // field of interest.
  109. func makeKey(id ID, field string) []byte {
  110. if bytes.Equal(id[:], nodeDBNilID[:]) {
  111. return []byte(field)
  112. }
  113. return append(nodeDBItemPrefix, append(id[:], field...)...)
  114. }
  115. // splitKey tries to split a database key into a node id and a field part.
  116. func splitKey(key []byte) (id ID, field string) {
  117. // If the key is not of a node, return it plainly
  118. if !bytes.HasPrefix(key, nodeDBItemPrefix) {
  119. return ID{}, string(key)
  120. }
  121. // Otherwise split the id and field
  122. item := key[len(nodeDBItemPrefix):]
  123. copy(id[:], item[:len(id)])
  124. field = string(item[len(id):])
  125. return id, field
  126. }
  127. // fetchInt64 retrieves an integer instance associated with a particular
  128. // database key.
  129. func (db *DB) fetchInt64(key []byte) int64 {
  130. blob, err := db.lvl.Get(key, nil)
  131. if err != nil {
  132. return 0
  133. }
  134. val, read := binary.Varint(blob)
  135. if read <= 0 {
  136. return 0
  137. }
  138. return val
  139. }
  140. // storeInt64 update a specific database entry to the current time instance as a
  141. // unix timestamp.
  142. func (db *DB) storeInt64(key []byte, n int64) error {
  143. blob := make([]byte, binary.MaxVarintLen64)
  144. blob = blob[:binary.PutVarint(blob, n)]
  145. return db.lvl.Put(key, blob, nil)
  146. }
  147. // Node retrieves a node with a given id from the database.
  148. func (db *DB) Node(id ID) *Node {
  149. blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil)
  150. if err != nil {
  151. return nil
  152. }
  153. return mustDecodeNode(id[:], blob)
  154. }
  155. func mustDecodeNode(id, data []byte) *Node {
  156. node := new(Node)
  157. if err := rlp.DecodeBytes(data, &node.r); err != nil {
  158. panic(fmt.Errorf("p2p/enode: can't decode node %x in DB: %v", id, err))
  159. }
  160. // Restore node id cache.
  161. copy(node.id[:], id)
  162. return node
  163. }
  164. // UpdateNode inserts - potentially overwriting - a node into the peer database.
  165. func (db *DB) UpdateNode(node *Node) error {
  166. blob, err := rlp.EncodeToBytes(&node.r)
  167. if err != nil {
  168. return err
  169. }
  170. return db.lvl.Put(makeKey(node.ID(), nodeDBDiscoverRoot), blob, nil)
  171. }
  172. // DeleteNode deletes all information/keys associated with a node.
  173. func (db *DB) DeleteNode(id ID) error {
  174. deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil)
  175. for deleter.Next() {
  176. if err := db.lvl.Delete(deleter.Key(), nil); err != nil {
  177. return err
  178. }
  179. }
  180. return nil
  181. }
  182. // ensureExpirer is a small helper method ensuring that the data expiration
  183. // mechanism is running. If the expiration goroutine is already running, this
  184. // method simply returns.
  185. //
  186. // The goal is to start the data evacuation only after the network successfully
  187. // bootstrapped itself (to prevent dumping potentially useful seed nodes). Since
  188. // it would require significant overhead to exactly trace the first successful
  189. // convergence, it's simpler to "ensure" the correct state when an appropriate
  190. // condition occurs (i.e. a successful bonding), and discard further events.
  191. func (db *DB) ensureExpirer() {
  192. db.runner.Do(func() { go db.expirer() })
  193. }
  194. // expirer should be started in a go routine, and is responsible for looping ad
  195. // infinitum and dropping stale data from the database.
  196. func (db *DB) expirer() {
  197. tick := time.NewTicker(nodeDBCleanupCycle)
  198. defer tick.Stop()
  199. for {
  200. select {
  201. case <-tick.C:
  202. if err := db.expireNodes(); err != nil {
  203. log.Error("Failed to expire nodedb items", "err", err)
  204. }
  205. case <-db.quit:
  206. return
  207. }
  208. }
  209. }
  210. // expireNodes iterates over the database and deletes all nodes that have not
  211. // been seen (i.e. received a pong from) for some allotted time.
  212. func (db *DB) expireNodes() error {
  213. threshold := time.Now().Add(-nodeDBNodeExpiration)
  214. // Find discovered nodes that are older than the allowance
  215. it := db.lvl.NewIterator(nil, nil)
  216. defer it.Release()
  217. for it.Next() {
  218. // Skip the item if not a discovery node
  219. id, field := splitKey(it.Key())
  220. if field != nodeDBDiscoverRoot {
  221. continue
  222. }
  223. // Skip the node if not expired yet (and not self)
  224. if seen := db.LastPongReceived(id); seen.After(threshold) {
  225. continue
  226. }
  227. // Otherwise delete all associated information
  228. db.DeleteNode(id)
  229. }
  230. return nil
  231. }
  232. // LastPingReceived retrieves the time of the last ping packet received from
  233. // a remote node.
  234. func (db *DB) LastPingReceived(id ID) time.Time {
  235. return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPing)), 0)
  236. }
  237. // UpdateLastPingReceived updates the last time we tried contacting a remote node.
  238. func (db *DB) UpdateLastPingReceived(id ID, instance time.Time) error {
  239. return db.storeInt64(makeKey(id, nodeDBDiscoverPing), instance.Unix())
  240. }
  241. // LastPongReceived retrieves the time of the last successful pong from remote node.
  242. func (db *DB) LastPongReceived(id ID) time.Time {
  243. // Launch expirer
  244. db.ensureExpirer()
  245. return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPong)), 0)
  246. }
  247. // UpdateLastPongReceived updates the last pong time of a node.
  248. func (db *DB) UpdateLastPongReceived(id ID, instance time.Time) error {
  249. return db.storeInt64(makeKey(id, nodeDBDiscoverPong), instance.Unix())
  250. }
  251. // FindFails retrieves the number of findnode failures since bonding.
  252. func (db *DB) FindFails(id ID) int {
  253. return int(db.fetchInt64(makeKey(id, nodeDBDiscoverFindFails)))
  254. }
  255. // UpdateFindFails updates the number of findnode failures since bonding.
  256. func (db *DB) UpdateFindFails(id ID, fails int) error {
  257. return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
  258. }
  259. // QuerySeeds retrieves random nodes to be used as potential seed nodes
  260. // for bootstrapping.
  261. func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
  262. var (
  263. now = time.Now()
  264. nodes = make([]*Node, 0, n)
  265. it = db.lvl.NewIterator(nil, nil)
  266. id ID
  267. )
  268. defer it.Release()
  269. seek:
  270. for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
  271. // Seek to a random entry. The first byte is incremented by a
  272. // random amount each time in order to increase the likelihood
  273. // of hitting all existing nodes in very small databases.
  274. ctr := id[0]
  275. rand.Read(id[:])
  276. id[0] = ctr + id[0]%16
  277. it.Seek(makeKey(id, nodeDBDiscoverRoot))
  278. n := nextNode(it)
  279. if n == nil {
  280. id[0] = 0
  281. continue seek // iterator exhausted
  282. }
  283. if now.Sub(db.LastPongReceived(n.ID())) > maxAge {
  284. continue seek
  285. }
  286. for i := range nodes {
  287. if nodes[i].ID() == n.ID() {
  288. continue seek // duplicate
  289. }
  290. }
  291. nodes = append(nodes, n)
  292. }
  293. return nodes
  294. }
  295. // reads the next node record from the iterator, skipping over other
  296. // database entries.
  297. func nextNode(it iterator.Iterator) *Node {
  298. for end := false; !end; end = !it.Next() {
  299. id, field := splitKey(it.Key())
  300. if field != nodeDBDiscoverRoot {
  301. continue
  302. }
  303. return mustDecodeNode(id[:], it.Value())
  304. }
  305. return nil
  306. }
  307. // close flushes and closes the database files.
  308. func (db *DB) Close() {
  309. close(db.quit)
  310. db.lvl.Close()
  311. }