nodedb.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. // Copyright 2018 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package enode
  17. import (
  18. "bytes"
  19. "crypto/rand"
  20. "encoding/binary"
  21. "fmt"
  22. "net"
  23. "os"
  24. "sync"
  25. "time"
  26. "github.com/ethereum/go-ethereum/rlp"
  27. "github.com/syndtr/goleveldb/leveldb"
  28. "github.com/syndtr/goleveldb/leveldb/errors"
  29. "github.com/syndtr/goleveldb/leveldb/iterator"
  30. "github.com/syndtr/goleveldb/leveldb/opt"
  31. "github.com/syndtr/goleveldb/leveldb/storage"
  32. "github.com/syndtr/goleveldb/leveldb/util"
  33. )
  34. // Keys in the node database.
  35. const (
  36. dbVersionKey = "version" // Version of the database to flush if changes
  37. dbNodePrefix = "n:" // Identifier to prefix node entries with
  38. dbLocalPrefix = "local:"
  39. dbDiscoverRoot = "v4"
  40. // These fields are stored per ID and IP, the full key is "n:<ID>:v4:<IP>:findfail".
  41. // Use nodeItemKey to create those keys.
  42. dbNodeFindFails = "findfail"
  43. dbNodePing = "lastping"
  44. dbNodePong = "lastpong"
  45. dbNodeSeq = "seq"
  46. // Local information is keyed by ID only, the full key is "local:<ID>:seq".
  47. // Use localItemKey to create those keys.
  48. dbLocalSeq = "seq"
  49. )
  50. const (
  51. dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
  52. dbCleanupCycle = time.Hour // Time period for running the expiration task.
  53. dbVersion = 9
  54. )
  55. var zeroIP = make(net.IP, 16)
  56. // DB is the node database, storing previously seen nodes and any collected metadata about
  57. // them for QoS purposes.
  58. type DB struct {
  59. lvl *leveldb.DB // Interface to the database itself
  60. runner sync.Once // Ensures we can start at most one expirer
  61. quit chan struct{} // Channel to signal the expiring thread to stop
  62. }
  63. // OpenDB opens a node database for storing and retrieving infos about known peers in the
  64. // network. If no path is given an in-memory, temporary database is constructed.
  65. func OpenDB(path string) (*DB, error) {
  66. if path == "" {
  67. return newMemoryDB()
  68. }
  69. return newPersistentDB(path)
  70. }
  71. // newMemoryNodeDB creates a new in-memory node database without a persistent backend.
  72. func newMemoryDB() (*DB, error) {
  73. db, err := leveldb.Open(storage.NewMemStorage(), nil)
  74. if err != nil {
  75. return nil, err
  76. }
  77. return &DB{lvl: db, quit: make(chan struct{})}, nil
  78. }
  79. // newPersistentNodeDB creates/opens a leveldb backed persistent node database,
  80. // also flushing its contents in case of a version mismatch.
  81. func newPersistentDB(path string) (*DB, error) {
  82. opts := &opt.Options{OpenFilesCacheCapacity: 5}
  83. db, err := leveldb.OpenFile(path, opts)
  84. if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
  85. db, err = leveldb.RecoverFile(path, nil)
  86. }
  87. if err != nil {
  88. return nil, err
  89. }
  90. // The nodes contained in the cache correspond to a certain protocol version.
  91. // Flush all nodes if the version doesn't match.
  92. currentVer := make([]byte, binary.MaxVarintLen64)
  93. currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]
  94. blob, err := db.Get([]byte(dbVersionKey), nil)
  95. switch err {
  96. case leveldb.ErrNotFound:
  97. // Version not found (i.e. empty cache), insert it
  98. if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
  99. db.Close()
  100. return nil, err
  101. }
  102. case nil:
  103. // Version present, flush if different
  104. if !bytes.Equal(blob, currentVer) {
  105. db.Close()
  106. if err = os.RemoveAll(path); err != nil {
  107. return nil, err
  108. }
  109. return newPersistentDB(path)
  110. }
  111. }
  112. return &DB{lvl: db, quit: make(chan struct{})}, nil
  113. }
  114. // nodeKey returns the database key for a node record.
  115. func nodeKey(id ID) []byte {
  116. key := append([]byte(dbNodePrefix), id[:]...)
  117. key = append(key, ':')
  118. key = append(key, dbDiscoverRoot...)
  119. return key
  120. }
  121. // splitNodeKey returns the node ID of a key created by nodeKey.
  122. func splitNodeKey(key []byte) (id ID, rest []byte) {
  123. if !bytes.HasPrefix(key, []byte(dbNodePrefix)) {
  124. return ID{}, nil
  125. }
  126. item := key[len(dbNodePrefix):]
  127. copy(id[:], item[:len(id)])
  128. return id, item[len(id)+1:]
  129. }
  130. // nodeItemKey returns the database key for a node metadata field.
  131. func nodeItemKey(id ID, ip net.IP, field string) []byte {
  132. ip16 := ip.To16()
  133. if ip16 == nil {
  134. panic(fmt.Errorf("invalid IP (length %d)", len(ip)))
  135. }
  136. return bytes.Join([][]byte{nodeKey(id), ip16, []byte(field)}, []byte{':'})
  137. }
  138. // splitNodeItemKey returns the components of a key created by nodeItemKey.
  139. func splitNodeItemKey(key []byte) (id ID, ip net.IP, field string) {
  140. id, key = splitNodeKey(key)
  141. // Skip discover root.
  142. if string(key) == dbDiscoverRoot {
  143. return id, nil, ""
  144. }
  145. key = key[len(dbDiscoverRoot)+1:]
  146. // Split out the IP.
  147. ip = net.IP(key[:16])
  148. if ip4 := ip.To4(); ip4 != nil {
  149. ip = ip4
  150. }
  151. key = key[16+1:]
  152. // Field is the remainder of key.
  153. field = string(key)
  154. return id, ip, field
  155. }
  156. // localItemKey returns the key of a local node item.
  157. func localItemKey(id ID, field string) []byte {
  158. key := append([]byte(dbLocalPrefix), id[:]...)
  159. key = append(key, ':')
  160. key = append(key, field...)
  161. return key
  162. }
  163. // fetchInt64 retrieves an integer associated with a particular key.
  164. func (db *DB) fetchInt64(key []byte) int64 {
  165. blob, err := db.lvl.Get(key, nil)
  166. if err != nil {
  167. return 0
  168. }
  169. val, read := binary.Varint(blob)
  170. if read <= 0 {
  171. return 0
  172. }
  173. return val
  174. }
  175. // storeInt64 stores an integer in the given key.
  176. func (db *DB) storeInt64(key []byte, n int64) error {
  177. blob := make([]byte, binary.MaxVarintLen64)
  178. blob = blob[:binary.PutVarint(blob, n)]
  179. return db.lvl.Put(key, blob, nil)
  180. }
  181. // fetchUint64 retrieves an integer associated with a particular key.
  182. func (db *DB) fetchUint64(key []byte) uint64 {
  183. blob, err := db.lvl.Get(key, nil)
  184. if err != nil {
  185. return 0
  186. }
  187. val, _ := binary.Uvarint(blob)
  188. return val
  189. }
  190. // storeUint64 stores an integer in the given key.
  191. func (db *DB) storeUint64(key []byte, n uint64) error {
  192. blob := make([]byte, binary.MaxVarintLen64)
  193. blob = blob[:binary.PutUvarint(blob, n)]
  194. return db.lvl.Put(key, blob, nil)
  195. }
  196. // Node retrieves a node with a given id from the database.
  197. func (db *DB) Node(id ID) *Node {
  198. blob, err := db.lvl.Get(nodeKey(id), nil)
  199. if err != nil {
  200. return nil
  201. }
  202. return mustDecodeNode(id[:], blob)
  203. }
  204. func mustDecodeNode(id, data []byte) *Node {
  205. node := new(Node)
  206. if err := rlp.DecodeBytes(data, &node.r); err != nil {
  207. panic(fmt.Errorf("p2p/enode: can't decode node %x in DB: %v", id, err))
  208. }
  209. // Restore node id cache.
  210. copy(node.id[:], id)
  211. return node
  212. }
  213. // UpdateNode inserts - potentially overwriting - a node into the peer database.
  214. func (db *DB) UpdateNode(node *Node) error {
  215. if node.Seq() < db.NodeSeq(node.ID()) {
  216. return nil
  217. }
  218. blob, err := rlp.EncodeToBytes(&node.r)
  219. if err != nil {
  220. return err
  221. }
  222. if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
  223. return err
  224. }
  225. return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
  226. }
  227. // NodeSeq returns the stored record sequence number of the given node.
  228. func (db *DB) NodeSeq(id ID) uint64 {
  229. return db.fetchUint64(nodeItemKey(id, zeroIP, dbNodeSeq))
  230. }
  231. // Resolve returns the stored record of the node if it has a larger sequence
  232. // number than n.
  233. func (db *DB) Resolve(n *Node) *Node {
  234. if n.Seq() > db.NodeSeq(n.ID()) {
  235. return n
  236. }
  237. return db.Node(n.ID())
  238. }
  239. // DeleteNode deletes all information associated with a node.
  240. func (db *DB) DeleteNode(id ID) {
  241. deleteRange(db.lvl, nodeKey(id))
  242. }
  243. func deleteRange(db *leveldb.DB, prefix []byte) {
  244. it := db.NewIterator(util.BytesPrefix(prefix), nil)
  245. defer it.Release()
  246. for it.Next() {
  247. db.Delete(it.Key(), nil)
  248. }
  249. }
  250. // ensureExpirer is a small helper method ensuring that the data expiration
  251. // mechanism is running. If the expiration goroutine is already running, this
  252. // method simply returns.
  253. //
  254. // The goal is to start the data evacuation only after the network successfully
  255. // bootstrapped itself (to prevent dumping potentially useful seed nodes). Since
  256. // it would require significant overhead to exactly trace the first successful
  257. // convergence, it's simpler to "ensure" the correct state when an appropriate
  258. // condition occurs (i.e. a successful bonding), and discard further events.
  259. func (db *DB) ensureExpirer() {
  260. db.runner.Do(func() { go db.expirer() })
  261. }
  262. // expirer should be started in a go routine, and is responsible for looping ad
  263. // infinitum and dropping stale data from the database.
  264. func (db *DB) expirer() {
  265. tick := time.NewTicker(dbCleanupCycle)
  266. defer tick.Stop()
  267. for {
  268. select {
  269. case <-tick.C:
  270. db.expireNodes()
  271. case <-db.quit:
  272. return
  273. }
  274. }
  275. }
  276. // expireNodes iterates over the database and deletes all nodes that have not
  277. // been seen (i.e. received a pong from) for some time.
  278. func (db *DB) expireNodes() {
  279. it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
  280. defer it.Release()
  281. if !it.Next() {
  282. return
  283. }
  284. var (
  285. threshold = time.Now().Add(-dbNodeExpiration).Unix()
  286. youngestPong int64
  287. atEnd = false
  288. )
  289. for !atEnd {
  290. id, ip, field := splitNodeItemKey(it.Key())
  291. if field == dbNodePong {
  292. time, _ := binary.Varint(it.Value())
  293. if time > youngestPong {
  294. youngestPong = time
  295. }
  296. if time < threshold {
  297. // Last pong from this IP older than threshold, remove fields belonging to it.
  298. deleteRange(db.lvl, nodeItemKey(id, ip, ""))
  299. }
  300. }
  301. atEnd = !it.Next()
  302. nextID, _ := splitNodeKey(it.Key())
  303. if atEnd || nextID != id {
  304. // We've moved beyond the last entry of the current ID.
  305. // Remove everything if there was no recent enough pong.
  306. if youngestPong > 0 && youngestPong < threshold {
  307. deleteRange(db.lvl, nodeKey(id))
  308. }
  309. youngestPong = 0
  310. }
  311. }
  312. }
  313. // LastPingReceived retrieves the time of the last ping packet received from
  314. // a remote node.
  315. func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {
  316. return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)
  317. }
  318. // UpdateLastPingReceived updates the last time we tried contacting a remote node.
  319. func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {
  320. return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())
  321. }
  322. // LastPongReceived retrieves the time of the last successful pong from remote node.
  323. func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {
  324. // Launch expirer
  325. db.ensureExpirer()
  326. return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)
  327. }
  328. // UpdateLastPongReceived updates the last pong time of a node.
  329. func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {
  330. return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
  331. }
  332. // FindFails retrieves the number of findnode failures since bonding.
  333. func (db *DB) FindFails(id ID, ip net.IP) int {
  334. return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))
  335. }
  336. // UpdateFindFails updates the number of findnode failures since bonding.
  337. func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {
  338. return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
  339. }
  340. // LocalSeq retrieves the local record sequence counter.
  341. func (db *DB) localSeq(id ID) uint64 {
  342. return db.fetchUint64(localItemKey(id, dbLocalSeq))
  343. }
  344. // storeLocalSeq stores the local record sequence counter.
  345. func (db *DB) storeLocalSeq(id ID, n uint64) {
  346. db.storeUint64(localItemKey(id, dbLocalSeq), n)
  347. }
  348. // QuerySeeds retrieves random nodes to be used as potential seed nodes
  349. // for bootstrapping.
  350. func (db *DB) QuerySeeds(n int, maxAge time.Duration) []*Node {
  351. var (
  352. now = time.Now()
  353. nodes = make([]*Node, 0, n)
  354. it = db.lvl.NewIterator(nil, nil)
  355. id ID
  356. )
  357. defer it.Release()
  358. seek:
  359. for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
  360. // Seek to a random entry. The first byte is incremented by a
  361. // random amount each time in order to increase the likelihood
  362. // of hitting all existing nodes in very small databases.
  363. ctr := id[0]
  364. rand.Read(id[:])
  365. id[0] = ctr + id[0]%16
  366. it.Seek(nodeKey(id))
  367. n := nextNode(it)
  368. if n == nil {
  369. id[0] = 0
  370. continue seek // iterator exhausted
  371. }
  372. if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
  373. continue seek
  374. }
  375. for i := range nodes {
  376. if nodes[i].ID() == n.ID() {
  377. continue seek // duplicate
  378. }
  379. }
  380. nodes = append(nodes, n)
  381. }
  382. return nodes
  383. }
  384. // reads the next node record from the iterator, skipping over other
  385. // database entries.
  386. func nextNode(it iterator.Iterator) *Node {
  387. for end := false; !end; end = !it.Next() {
  388. id, rest := splitNodeKey(it.Key())
  389. if string(rest) != dbDiscoverRoot {
  390. continue
  391. }
  392. return mustDecodeNode(id[:], it.Value())
  393. }
  394. return nil
  395. }
  396. // close flushes and closes the database files.
  397. func (db *DB) Close() {
  398. close(db.quit)
  399. db.lvl.Close()
  400. }