nodedb.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package enode
  17. import (
  18. "bytes"
  19. "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "os"
  23. "sync"
  24. "time"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. "github.com/syndtr/goleveldb/leveldb"
  28. "github.com/syndtr/goleveldb/leveldb/errors"
  29. "github.com/syndtr/goleveldb/leveldb/iterator"
  30. "github.com/syndtr/goleveldb/leveldb/opt"
  31. "github.com/syndtr/goleveldb/leveldb/storage"
  32. "github.com/syndtr/goleveldb/leveldb/util"
  33. )
  34. // Keys in the node database.
  35. const (
  36. dbVersionKey = "version" // Version of the database to flush if changes
  37. dbItemPrefix = "n:" // Identifier to prefix node entries with
  38. dbDiscoverRoot = ":discover"
  39. dbDiscoverSeq = dbDiscoverRoot + ":seq"
  40. dbDiscoverPing = dbDiscoverRoot + ":lastping"
  41. dbDiscoverPong = dbDiscoverRoot + ":lastpong"
  42. dbDiscoverFindFails = dbDiscoverRoot + ":findfail"
  43. dbLocalRoot = ":local"
  44. dbLocalSeq = dbLocalRoot + ":seq"
  45. )
  46. var (
  47. dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
  48. dbCleanupCycle = time.Hour // Time period for running the expiration task.
  49. dbVersion = 7
  50. )
  51. // DB is the node database, storing previously seen nodes and any collected metadata about
  52. // them for QoS purposes.
  53. type DB struct {
  54. lvl *leveldb.DB // Interface to the database itself
  55. runner sync.Once // Ensures we can start at most one expirer
  56. quit chan struct{} // Channel to signal the expiring thread to stop
  57. }
  58. // OpenDB opens a node database for storing and retrieving infos about known peers in the
  59. // network. If no path is given an in-memory, temporary database is constructed.
  60. func OpenDB(path string) (*DB, error) {
  61. if path == "" {
  62. return newMemoryDB()
  63. }
  64. return newPersistentDB(path)
  65. }
  66. // newMemoryNodeDB creates a new in-memory node database without a persistent backend.
  67. func newMemoryDB() (*DB, error) {
  68. db, err := leveldb.Open(storage.NewMemStorage(), nil)
  69. if err != nil {
  70. return nil, err
  71. }
  72. return &DB{lvl: db, quit: make(chan struct{})}, nil
  73. }
  74. // newPersistentNodeDB creates/opens a leveldb backed persistent node database,
  75. // also flushing its contents in case of a version mismatch.
  76. func newPersistentDB(path string) (*DB, error) {
  77. opts := &opt.Options{OpenFilesCacheCapacity: 5}
  78. db, err := leveldb.OpenFile(path, opts)
  79. if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
  80. db, err = leveldb.RecoverFile(path, nil)
  81. }
  82. if err != nil {
  83. return nil, err
  84. }
  85. // The nodes contained in the cache correspond to a certain protocol version.
  86. // Flush all nodes if the version doesn't match.
  87. currentVer := make([]byte, binary.MaxVarintLen64)
  88. currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
  89. blob, err := db.Get([]byte(dbVersionKey), nil)
  90. switch err {
  91. case leveldb.ErrNotFound:
  92. // Version not found (i.e. empty cache), insert it
  93. if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
  94. db.Close()
  95. return nil, err
  96. }
  97. case nil:
  98. // Version present, flush if different
  99. if !bytes.Equal(blob, currentVer) {
  100. db.Close()
  101. if err = os.RemoveAll(path); err != nil {
  102. return nil, err
  103. }
  104. return newPersistentDB(path)
  105. }
  106. }
  107. return &DB{lvl: db, quit: make(chan struct{})}, nil
  108. }
  109. // makeKey generates the leveldb key-blob from a node id and its particular
  110. // field of interest.
  111. func makeKey(id ID, field string) []byte {
  112. if (id == ID{}) {
  113. return []byte(field)
  114. }
  115. return append([]byte(dbItemPrefix), append(id[:], field...)...)
  116. }
  117. // splitKey tries to split a database key into a node id and a field part.
  118. func splitKey(key []byte) (id ID, field string) {
  119. // If the key is not of a node, return it plainly
  120. if !bytes.HasPrefix(key, []byte(dbItemPrefix)) {
  121. return ID{}, string(key)
  122. }
  123. // Otherwise split the id and field
  124. item := key[len(dbItemPrefix):]
  125. copy(id[:], item[:len(id)])
  126. field = string(item[len(id):])
  127. return id, field
  128. }
  129. // fetchInt64 retrieves an integer associated with a particular key.
  130. func (db *DB) fetchInt64(key []byte) int64 {
  131. blob, err := db.lvl.Get(key, nil)
  132. if err != nil {
  133. return 0
  134. }
  135. val, read := binary.Varint(blob)
  136. if read <= 0 {
  137. return 0
  138. }
  139. return val
  140. }
  141. // storeInt64 stores an integer in the given key.
  142. func (db *DB) storeInt64(key []byte, n int64) error {
  143. blob := make([]byte, binary.MaxVarintLen64)
  144. blob = blob[:binary.PutVarint(blob, n)]
  145. return db.lvl.Put(key, blob, nil)
  146. }
  147. // fetchUint64 retrieves an integer associated with a particular key.
  148. func (db *DB) fetchUint64(key []byte) uint64 {
  149. blob, err := db.lvl.Get(key, nil)
  150. if err != nil {
  151. return 0
  152. }
  153. val, _ := binary.Uvarint(blob)
  154. return val
  155. }
  156. // storeUint64 stores an integer in the given key.
  157. func (db *DB) storeUint64(key []byte, n uint64) error {
  158. blob := make([]byte, binary.MaxVarintLen64)
  159. blob = blob[:binary.PutUvarint(blob, n)]
  160. return db.lvl.Put(key, blob, nil)
  161. }
  162. // Node retrieves a node with a given id from the database.
  163. func (db *DB) Node(id ID) *Node {
  164. blob, err := db.lvl.Get(makeKey(id, dbDiscoverRoot), nil)
  165. if err != nil {
  166. return nil
  167. }
  168. return mustDecodeNode(id[:], blob)
  169. }
  170. func mustDecodeNode(id, data []byte) *Node {
  171. node := new(Node)
  172. if err := rlp.DecodeBytes(data, &node.r); err != nil {
  173. panic(fmt.Errorf("p2p/enode: can't decode node %x in DB: %v", id, err))
  174. }
  175. // Restore node id cache.
  176. copy(node.id[:], id)
  177. return node
  178. }
  179. // UpdateNode inserts - potentially overwriting - a node into the peer database.
  180. func (db *DB) UpdateNode(node *Node) error {
  181. if node.Seq() < db.NodeSeq(node.ID()) {
  182. return nil
  183. }
  184. blob, err := rlp.EncodeToBytes(&node.r)
  185. if err != nil {
  186. return err
  187. }
  188. if err := db.lvl.Put(makeKey(node.ID(), dbDiscoverRoot), blob, nil); err != nil {
  189. return err
  190. }
  191. return db.storeUint64(makeKey(node.ID(), dbDiscoverSeq), node.Seq())
  192. }
  193. // NodeSeq returns the stored record sequence number of the given node.
  194. func (db *DB) NodeSeq(id ID) uint64 {
  195. return db.fetchUint64(makeKey(id, dbDiscoverSeq))
  196. }
  197. // Resolve returns the stored record of the node if it has a larger sequence
  198. // number than n.
  199. func (db *DB) Resolve(n *Node) *Node {
  200. if n.Seq() > db.NodeSeq(n.ID()) {
  201. return n
  202. }
  203. return db.Node(n.ID())
  204. }
  205. // DeleteNode deletes all information/keys associated with a node.
  206. func (db *DB) DeleteNode(id ID) error {
  207. deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil)
  208. for deleter.Next() {
  209. if err := db.lvl.Delete(deleter.Key(), nil); err != nil {
  210. return err
  211. }
  212. }
  213. return nil
  214. }
  215. // ensureExpirer is a small helper method ensuring that the data expiration
  216. // mechanism is running. If the expiration goroutine is already running, this
  217. // method simply returns.
  218. //
  219. // The goal is to start the data evacuation only after the network successfully
  220. // bootstrapped itself (to prevent dumping potentially useful seed nodes). Since
  221. // it would require significant overhead to exactly trace the first successful
  222. // convergence, it's simpler to "ensure" the correct state when an appropriate
  223. // condition occurs (i.e. a successful bonding), and discard further events.
  224. func (db *DB) ensureExpirer() {
  225. db.runner.Do(func() { go db.expirer() })
  226. }
  227. // expirer should be started in a go routine, and is responsible for looping ad
  228. // infinitum and dropping stale data from the database.
  229. func (db *DB) expirer() {
  230. tick := time.NewTicker(dbCleanupCycle)
  231. defer tick.Stop()
  232. for {
  233. select {
  234. case <-tick.C:
  235. if err := db.expireNodes(); err != nil {
  236. log.Error("Failed to expire nodedb items", "err", err)
  237. }
  238. case <-db.quit:
  239. return
  240. }
  241. }
  242. }
  243. // expireNodes iterates over the database and deletes all nodes that have not
  244. // been seen (i.e. received a pong from) for some allotted time.
  245. func (db *DB) expireNodes() error {
  246. threshold := time.Now().Add(-dbNodeExpiration)
  247. // Find discovered nodes that are older than the allowance
  248. it := db.lvl.NewIterator(nil, nil)
  249. defer it.Release()
  250. for it.Next() {
  251. // Skip the item if not a discovery node
  252. id, field := splitKey(it.Key())
  253. if field != dbDiscoverRoot {
  254. continue
  255. }
  256. // Skip the node if not expired yet (and not self)
  257. if seen := db.LastPongReceived(id); seen.After(threshold) {
  258. continue
  259. }
  260. // Otherwise delete all associated information
  261. db.DeleteNode(id)
  262. }
  263. return nil
  264. }
  265. // LastPingReceived retrieves the time of the last ping packet received from
  266. // a remote node.
  267. func (db *DB) LastPingReceived(id ID) time.Time {
  268. return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPing)), 0)
  269. }
  270. // UpdateLastPingReceived updates the last time we tried contacting a remote node.
  271. func (db *DB) UpdateLastPingReceived(id ID, instance time.Time) error {
  272. return db.storeInt64(makeKey(id, dbDiscoverPing), instance.Unix())
  273. }
  274. // LastPongReceived retrieves the time of the last successful pong from remote node.
  275. func (db *DB) LastPongReceived(id ID) time.Time {
  276. // Launch expirer
  277. db.ensureExpirer()
  278. return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPong)), 0)
  279. }
  280. // UpdateLastPongReceived updates the last pong time of a node.
  281. func (db *DB) UpdateLastPongReceived(id ID, instance time.Time) error {
  282. return db.storeInt64(makeKey(id, dbDiscoverPong), instance.Unix())
  283. }
  284. // FindFails retrieves the number of findnode failures since bonding.
  285. func (db *DB) FindFails(id ID) int {
  286. return int(db.fetchInt64(makeKey(id, dbDiscoverFindFails)))
  287. }
  288. // UpdateFindFails updates the number of findnode failures since bonding.
  289. func (db *DB) UpdateFindFails(id ID, fails int) error {
  290. return db.storeInt64(makeKey(id, dbDiscoverFindFails), int64(fails))
  291. }
  292. // LocalSeq retrieves the local record sequence counter.
  293. func (db *DB) localSeq(id ID) uint64 {
  294. return db.fetchUint64(makeKey(id, dbLocalSeq))
  295. }
  296. // storeLocalSeq stores the local record sequence counter.
  297. func (db *DB) storeLocalSeq(id ID, n uint64) {
  298. db.storeUint64(makeKey(id, dbLocalSeq), n)
  299. }
  300. // QuerySeeds retrieves random nodes to be used as potential seed nodes
  301. // for bootstrapping.
  302. func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
  303. var (
  304. now = time.Now()
  305. nodes = make([]*Node, 0, n)
  306. it = db.lvl.NewIterator(nil, nil)
  307. id ID
  308. )
  309. defer it.Release()
  310. seek:
  311. for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
  312. // Seek to a random entry. The first byte is incremented by a
  313. // random amount each time in order to increase the likelihood
  314. // of hitting all existing nodes in very small databases.
  315. ctr := id[0]
  316. rand.Read(id[:])
  317. id[0] = ctr + id[0]%16
  318. it.Seek(makeKey(id, dbDiscoverRoot))
  319. n := nextNode(it)
  320. if n == nil {
  321. id[0] = 0
  322. continue seek // iterator exhausted
  323. }
  324. if now.Sub(db.LastPongReceived(n.ID())) > maxAge {
  325. continue seek
  326. }
  327. for i := range nodes {
  328. if nodes[i].ID() == n.ID() {
  329. continue seek // duplicate
  330. }
  331. }
  332. nodes = append(nodes, n)
  333. }
  334. return nodes
  335. }
  336. // reads the next node record from the iterator, skipping over other
  337. // database entries.
  338. func nextNode(it iterator.Iterator) *Node {
  339. for end := false; !end; end = !it.Next() {
  340. id, field := splitKey(it.Key())
  341. if field != dbDiscoverRoot {
  342. continue
  343. }
  344. return mustDecodeNode(id[:], it.Value())
  345. }
  346. return nil
  347. }
  348. // close flushes and closes the database files.
  349. func (db *DB) Close() {
  350. close(db.quit)
  351. db.lvl.Close()
  352. }