nodestate.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. // Copyright 2020 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package nodestate
  17. import (
  18. "errors"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "unsafe"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/log"
  26. "github.com/ethereum/go-ethereum/metrics"
  27. "github.com/ethereum/go-ethereum/p2p/enode"
  28. "github.com/ethereum/go-ethereum/p2p/enr"
  29. "github.com/ethereum/go-ethereum/rlp"
  30. )
  31. type (
  32. // NodeStateMachine connects different system components operating on subsets of
  33. // network nodes. Node states are represented by 64 bit vectors with each bit assigned
  34. // to a state flag. Each state flag has a descriptor structure and the mapping is
  35. // created automatically. It is possible to subscribe to subsets of state flags and
  36. // receive a callback if one of the nodes has a relevant state flag changed.
  37. // Callbacks can also modify further flags of the same node or other nodes. State
  38. // updates only return after all immediate effects throughout the system have happened
  39. // (deadlocks should be avoided by design of the implemented state logic). The caller
  40. // can also add timeouts assigned to a certain node and a subset of state flags.
  41. // If the timeout elapses, the flags are reset. If all relevant flags are reset then
  42. // the timer is dropped. State flags with no timeout are persisted in the database
  43. // if the flag descriptor enables saving. If a node has no state flags set at any
  44. // moment then it is discarded.
  45. //
  46. // Extra node fields can also be registered so system components can also store more
  47. // complex state for each node that is relevant to them, without creating a custom
  48. // peer set. Fields can be shared across multiple components if they all know the
  49. // field ID. Subscription to fields is also possible. Persistent fields should have
  50. // an encoder and a decoder function.
  51. NodeStateMachine struct {
  52. started, stopped bool
  53. lock sync.Mutex
  54. clock mclock.Clock
  55. db ethdb.KeyValueStore
  56. dbNodeKey []byte
  57. nodes map[enode.ID]*nodeInfo
  58. offlineCallbackList []offlineCallback
  59. // Registered state flags or fields. Modifications are allowed
  60. // only when the node state machine has not been started.
  61. setup *Setup
  62. fields []*fieldInfo
  63. saveFlags bitMask
  64. // Installed callbacks. Modifications are allowed only when the
  65. // node state machine has not been started.
  66. stateSubs []stateSub
  67. // Testing hooks, only for testing purposes.
  68. saveNodeHook func(*nodeInfo)
  69. }
  70. // Flags represents a set of flags from a certain setup
  71. Flags struct {
  72. mask bitMask
  73. setup *Setup
  74. }
  75. // Field represents a field from a certain setup
  76. Field struct {
  77. index int
  78. setup *Setup
  79. }
  80. // flagDefinition describes a node state flag. Each registered instance is automatically
  81. // mapped to a bit of the 64 bit node states.
  82. // If persistent is true then the node is saved when state machine is shutdown.
  83. flagDefinition struct {
  84. name string
  85. persistent bool
  86. }
  87. // fieldDefinition describes an optional node field of the given type. The contents
  88. // of the field are only retained for each node as long as at least one of the
  89. // state flags is set.
  90. fieldDefinition struct {
  91. name string
  92. ftype reflect.Type
  93. encode func(interface{}) ([]byte, error)
  94. decode func([]byte) (interface{}, error)
  95. }
  96. // stateSetup contains the list of flags and fields used by the application
  97. Setup struct {
  98. Version uint
  99. flags []flagDefinition
  100. fields []fieldDefinition
  101. }
  102. // bitMask describes a node state or state mask. It represents a subset
  103. // of node flags with each bit assigned to a flag index (LSB represents flag 0).
  104. bitMask uint64
  105. // StateCallback is a subscription callback which is called when one of the
  106. // state flags that is included in the subscription state mask is changed.
  107. // Note: oldState and newState are also masked with the subscription mask so only
  108. // the relevant bits are included.
  109. StateCallback func(n *enode.Node, oldState, newState Flags)
  110. // FieldCallback is a subscription callback which is called when the value of
  111. // a specific field is changed.
  112. FieldCallback func(n *enode.Node, state Flags, oldValue, newValue interface{})
  113. // nodeInfo contains node state, fields and state timeouts
  114. nodeInfo struct {
  115. node *enode.Node
  116. state bitMask
  117. timeouts []*nodeStateTimeout
  118. fields []interface{}
  119. db, dirty bool
  120. }
  121. nodeInfoEnc struct {
  122. Enr enr.Record
  123. Version uint
  124. State bitMask
  125. Fields [][]byte
  126. }
  127. stateSub struct {
  128. mask bitMask
  129. callback StateCallback
  130. }
  131. nodeStateTimeout struct {
  132. mask bitMask
  133. timer mclock.Timer
  134. }
  135. fieldInfo struct {
  136. fieldDefinition
  137. subs []FieldCallback
  138. }
  139. offlineCallback struct {
  140. node *enode.Node
  141. state bitMask
  142. fields []interface{}
  143. }
  144. )
  145. // offlineState is a special state that is assumed to be set before a node is loaded from
  146. // the database and after it is shut down.
  147. const offlineState = bitMask(1)
  148. // NewFlag creates a new node state flag
  149. func (s *Setup) NewFlag(name string) Flags {
  150. if s.flags == nil {
  151. s.flags = []flagDefinition{{name: "offline"}}
  152. }
  153. f := Flags{mask: bitMask(1) << uint(len(s.flags)), setup: s}
  154. s.flags = append(s.flags, flagDefinition{name: name})
  155. return f
  156. }
  157. // NewPersistentFlag creates a new persistent node state flag
  158. func (s *Setup) NewPersistentFlag(name string) Flags {
  159. if s.flags == nil {
  160. s.flags = []flagDefinition{{name: "offline"}}
  161. }
  162. f := Flags{mask: bitMask(1) << uint(len(s.flags)), setup: s}
  163. s.flags = append(s.flags, flagDefinition{name: name, persistent: true})
  164. return f
  165. }
  166. // OfflineFlag returns the system-defined offline flag belonging to the given setup
  167. func (s *Setup) OfflineFlag() Flags {
  168. return Flags{mask: offlineState, setup: s}
  169. }
  170. // NewField creates a new node state field
  171. func (s *Setup) NewField(name string, ftype reflect.Type) Field {
  172. f := Field{index: len(s.fields), setup: s}
  173. s.fields = append(s.fields, fieldDefinition{
  174. name: name,
  175. ftype: ftype,
  176. })
  177. return f
  178. }
  179. // NewPersistentField creates a new persistent node field
  180. func (s *Setup) NewPersistentField(name string, ftype reflect.Type, encode func(interface{}) ([]byte, error), decode func([]byte) (interface{}, error)) Field {
  181. f := Field{index: len(s.fields), setup: s}
  182. s.fields = append(s.fields, fieldDefinition{
  183. name: name,
  184. ftype: ftype,
  185. encode: encode,
  186. decode: decode,
  187. })
  188. return f
  189. }
  190. // flagOp implements binary flag operations and also checks whether the operands belong to the same setup
  191. func flagOp(a, b Flags, trueIfA, trueIfB, trueIfBoth bool) Flags {
  192. if a.setup == nil {
  193. if a.mask != 0 {
  194. panic("Node state flags have no setup reference")
  195. }
  196. a.setup = b.setup
  197. }
  198. if b.setup == nil {
  199. if b.mask != 0 {
  200. panic("Node state flags have no setup reference")
  201. }
  202. b.setup = a.setup
  203. }
  204. if a.setup != b.setup {
  205. panic("Node state flags belong to a different setup")
  206. }
  207. res := Flags{setup: a.setup}
  208. if trueIfA {
  209. res.mask |= a.mask & ^b.mask
  210. }
  211. if trueIfB {
  212. res.mask |= b.mask & ^a.mask
  213. }
  214. if trueIfBoth {
  215. res.mask |= a.mask & b.mask
  216. }
  217. return res
  218. }
  219. // And returns the set of flags present in both a and b
  220. func (a Flags) And(b Flags) Flags { return flagOp(a, b, false, false, true) }
  221. // AndNot returns the set of flags present in a but not in b
  222. func (a Flags) AndNot(b Flags) Flags { return flagOp(a, b, true, false, false) }
  223. // Or returns the set of flags present in either a or b
  224. func (a Flags) Or(b Flags) Flags { return flagOp(a, b, true, true, true) }
  225. // Xor returns the set of flags present in either a or b but not both
  226. func (a Flags) Xor(b Flags) Flags { return flagOp(a, b, true, true, false) }
  227. // HasAll returns true if b is a subset of a
  228. func (a Flags) HasAll(b Flags) bool { return flagOp(a, b, false, true, false).mask == 0 }
  229. // HasNone returns true if a and b have no shared flags
  230. func (a Flags) HasNone(b Flags) bool { return flagOp(a, b, false, false, true).mask == 0 }
  231. // Equals returns true if a and b have the same flags set
  232. func (a Flags) Equals(b Flags) bool { return flagOp(a, b, true, true, false).mask == 0 }
  233. // IsEmpty returns true if a has no flags set
  234. func (a Flags) IsEmpty() bool { return a.mask == 0 }
  235. // MergeFlags merges multiple sets of state flags
  236. func MergeFlags(list ...Flags) Flags {
  237. if len(list) == 0 {
  238. return Flags{}
  239. }
  240. res := list[0]
  241. for i := 1; i < len(list); i++ {
  242. res = res.Or(list[i])
  243. }
  244. return res
  245. }
  246. // String returns a list of the names of the flags specified in the bit mask
  247. func (f Flags) String() string {
  248. if f.mask == 0 {
  249. return "[]"
  250. }
  251. s := "["
  252. comma := false
  253. for index, flag := range f.setup.flags {
  254. if f.mask&(bitMask(1)<<uint(index)) != 0 {
  255. if comma {
  256. s = s + ", "
  257. }
  258. s = s + flag.name
  259. comma = true
  260. }
  261. }
  262. s = s + "]"
  263. return s
  264. }
  265. // NewNodeStateMachine creates a new node state machine.
  266. // If db is not nil then the node states, fields and active timeouts are persisted.
  267. // Persistence can be enabled or disabled for each state flag and field.
  268. func NewNodeStateMachine(db ethdb.KeyValueStore, dbKey []byte, clock mclock.Clock, setup *Setup) *NodeStateMachine {
  269. if setup.flags == nil {
  270. panic("No state flags defined")
  271. }
  272. if len(setup.flags) > 8*int(unsafe.Sizeof(bitMask(0))) {
  273. panic("Too many node state flags")
  274. }
  275. ns := &NodeStateMachine{
  276. db: db,
  277. dbNodeKey: dbKey,
  278. clock: clock,
  279. setup: setup,
  280. nodes: make(map[enode.ID]*nodeInfo),
  281. fields: make([]*fieldInfo, len(setup.fields)),
  282. }
  283. stateNameMap := make(map[string]int)
  284. for index, flag := range setup.flags {
  285. if _, ok := stateNameMap[flag.name]; ok {
  286. panic("Node state flag name collision")
  287. }
  288. stateNameMap[flag.name] = index
  289. if flag.persistent {
  290. ns.saveFlags |= bitMask(1) << uint(index)
  291. }
  292. }
  293. fieldNameMap := make(map[string]int)
  294. for index, field := range setup.fields {
  295. if _, ok := fieldNameMap[field.name]; ok {
  296. panic("Node field name collision")
  297. }
  298. ns.fields[index] = &fieldInfo{fieldDefinition: field}
  299. fieldNameMap[field.name] = index
  300. }
  301. return ns
  302. }
  303. // stateMask checks whether the set of flags belongs to the same setup and returns its internal bit mask
  304. func (ns *NodeStateMachine) stateMask(flags Flags) bitMask {
  305. if flags.setup != ns.setup && flags.mask != 0 {
  306. panic("Node state flags belong to a different setup")
  307. }
  308. return flags.mask
  309. }
  310. // fieldIndex checks whether the field belongs to the same setup and returns its internal index
  311. func (ns *NodeStateMachine) fieldIndex(field Field) int {
  312. if field.setup != ns.setup {
  313. panic("Node field belongs to a different setup")
  314. }
  315. return field.index
  316. }
  317. // SubscribeState adds a node state subscription. The callback is called while the state
  318. // machine mutex is not held and it is allowed to make further state updates. All immediate
  319. // changes throughout the system are processed in the same thread/goroutine. It is the
  320. // responsibility of the implemented state logic to avoid deadlocks caused by the callbacks,
  321. // infinite toggling of flags or hazardous/non-deterministic state changes.
  322. // State subscriptions should be installed before loading the node database or making the
  323. // first state update.
  324. func (ns *NodeStateMachine) SubscribeState(flags Flags, callback StateCallback) {
  325. ns.lock.Lock()
  326. defer ns.lock.Unlock()
  327. if ns.started {
  328. panic("state machine already started")
  329. }
  330. ns.stateSubs = append(ns.stateSubs, stateSub{ns.stateMask(flags), callback})
  331. }
  332. // SubscribeField adds a node field subscription. Same rules apply as for SubscribeState.
  333. func (ns *NodeStateMachine) SubscribeField(field Field, callback FieldCallback) {
  334. ns.lock.Lock()
  335. defer ns.lock.Unlock()
  336. if ns.started {
  337. panic("state machine already started")
  338. }
  339. f := ns.fields[ns.fieldIndex(field)]
  340. f.subs = append(f.subs, callback)
  341. }
  342. // newNode creates a new nodeInfo
  343. func (ns *NodeStateMachine) newNode(n *enode.Node) *nodeInfo {
  344. return &nodeInfo{node: n, fields: make([]interface{}, len(ns.fields))}
  345. }
  346. // checkStarted checks whether the state machine has already been started and panics otherwise.
  347. func (ns *NodeStateMachine) checkStarted() {
  348. if !ns.started {
  349. panic("state machine not started yet")
  350. }
  351. }
  352. // Start starts the state machine, enabling state and field operations and disabling
  353. // further subscriptions.
  354. func (ns *NodeStateMachine) Start() {
  355. ns.lock.Lock()
  356. if ns.started {
  357. panic("state machine already started")
  358. }
  359. ns.started = true
  360. if ns.db != nil {
  361. ns.loadFromDb()
  362. }
  363. ns.lock.Unlock()
  364. ns.offlineCallbacks(true)
  365. }
  366. // Stop stops the state machine and saves its state if a database was supplied
  367. func (ns *NodeStateMachine) Stop() {
  368. ns.lock.Lock()
  369. for _, node := range ns.nodes {
  370. fields := make([]interface{}, len(node.fields))
  371. copy(fields, node.fields)
  372. ns.offlineCallbackList = append(ns.offlineCallbackList, offlineCallback{node.node, node.state, fields})
  373. }
  374. ns.stopped = true
  375. if ns.db != nil {
  376. ns.saveToDb()
  377. ns.lock.Unlock()
  378. } else {
  379. ns.lock.Unlock()
  380. }
  381. ns.offlineCallbacks(false)
  382. }
  383. // loadFromDb loads persisted node states from the database
  384. func (ns *NodeStateMachine) loadFromDb() {
  385. it := ns.db.NewIterator(ns.dbNodeKey, nil)
  386. for it.Next() {
  387. var id enode.ID
  388. if len(it.Key()) != len(ns.dbNodeKey)+len(id) {
  389. log.Error("Node state db entry with invalid length", "found", len(it.Key()), "expected", len(ns.dbNodeKey)+len(id))
  390. continue
  391. }
  392. copy(id[:], it.Key()[len(ns.dbNodeKey):])
  393. ns.decodeNode(id, it.Value())
  394. }
  395. }
  396. type dummyIdentity enode.ID
  397. func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil }
  398. func (id dummyIdentity) NodeAddr(r *enr.Record) []byte { return id[:] }
  399. // decodeNode decodes a node database entry and adds it to the node set if successful
  400. func (ns *NodeStateMachine) decodeNode(id enode.ID, data []byte) {
  401. var enc nodeInfoEnc
  402. if err := rlp.DecodeBytes(data, &enc); err != nil {
  403. log.Error("Failed to decode node info", "id", id, "error", err)
  404. return
  405. }
  406. n, _ := enode.New(dummyIdentity(id), &enc.Enr)
  407. node := ns.newNode(n)
  408. node.db = true
  409. if enc.Version != ns.setup.Version {
  410. log.Debug("Removing stored node with unknown version", "current", ns.setup.Version, "stored", enc.Version)
  411. ns.deleteNode(id)
  412. return
  413. }
  414. if len(enc.Fields) > len(ns.setup.fields) {
  415. log.Error("Invalid node field count", "id", id, "stored", len(enc.Fields))
  416. return
  417. }
  418. // Resolve persisted node fields
  419. for i, encField := range enc.Fields {
  420. if len(encField) == 0 {
  421. continue
  422. }
  423. if decode := ns.fields[i].decode; decode != nil {
  424. if field, err := decode(encField); err == nil {
  425. node.fields[i] = field
  426. } else {
  427. log.Error("Failed to decode node field", "id", id, "field name", ns.fields[i].name, "error", err)
  428. return
  429. }
  430. } else {
  431. log.Error("Cannot decode node field", "id", id, "field name", ns.fields[i].name)
  432. return
  433. }
  434. }
  435. // It's a compatible node record, add it to set.
  436. ns.nodes[id] = node
  437. node.state = enc.State
  438. fields := make([]interface{}, len(node.fields))
  439. copy(fields, node.fields)
  440. ns.offlineCallbackList = append(ns.offlineCallbackList, offlineCallback{node.node, node.state, fields})
  441. log.Debug("Loaded node state", "id", id, "state", Flags{mask: enc.State, setup: ns.setup})
  442. }
  443. // saveNode saves the given node info to the database
  444. func (ns *NodeStateMachine) saveNode(id enode.ID, node *nodeInfo) error {
  445. if ns.db == nil {
  446. return nil
  447. }
  448. storedState := node.state & ns.saveFlags
  449. for _, t := range node.timeouts {
  450. storedState &= ^t.mask
  451. }
  452. if storedState == 0 {
  453. if node.db {
  454. node.db = false
  455. ns.deleteNode(id)
  456. }
  457. node.dirty = false
  458. return nil
  459. }
  460. enc := nodeInfoEnc{
  461. Enr: *node.node.Record(),
  462. Version: ns.setup.Version,
  463. State: storedState,
  464. Fields: make([][]byte, len(ns.fields)),
  465. }
  466. log.Debug("Saved node state", "id", id, "state", Flags{mask: enc.State, setup: ns.setup})
  467. lastIndex := -1
  468. for i, f := range node.fields {
  469. if f == nil {
  470. continue
  471. }
  472. encode := ns.fields[i].encode
  473. if encode == nil {
  474. continue
  475. }
  476. blob, err := encode(f)
  477. if err != nil {
  478. return err
  479. }
  480. enc.Fields[i] = blob
  481. lastIndex = i
  482. }
  483. enc.Fields = enc.Fields[:lastIndex+1]
  484. data, err := rlp.EncodeToBytes(&enc)
  485. if err != nil {
  486. return err
  487. }
  488. if err := ns.db.Put(append(ns.dbNodeKey, id[:]...), data); err != nil {
  489. return err
  490. }
  491. node.dirty, node.db = false, true
  492. if ns.saveNodeHook != nil {
  493. ns.saveNodeHook(node)
  494. }
  495. return nil
  496. }
  497. // deleteNode removes a node info from the database
  498. func (ns *NodeStateMachine) deleteNode(id enode.ID) {
  499. ns.db.Delete(append(ns.dbNodeKey, id[:]...))
  500. }
  501. // saveToDb saves the persistent flags and fields of all nodes that have been changed
  502. func (ns *NodeStateMachine) saveToDb() {
  503. for id, node := range ns.nodes {
  504. if node.dirty {
  505. err := ns.saveNode(id, node)
  506. if err != nil {
  507. log.Error("Failed to save node", "id", id, "error", err)
  508. }
  509. }
  510. }
  511. }
  512. // updateEnode updates the enode entry belonging to the given node if it already exists
  513. func (ns *NodeStateMachine) updateEnode(n *enode.Node) (enode.ID, *nodeInfo) {
  514. id := n.ID()
  515. node := ns.nodes[id]
  516. if node != nil && n.Seq() > node.node.Seq() {
  517. node.node = n
  518. }
  519. return id, node
  520. }
  521. // Persist saves the persistent state and fields of the given node immediately
  522. func (ns *NodeStateMachine) Persist(n *enode.Node) error {
  523. ns.lock.Lock()
  524. defer ns.lock.Unlock()
  525. ns.checkStarted()
  526. if id, node := ns.updateEnode(n); node != nil && node.dirty {
  527. err := ns.saveNode(id, node)
  528. if err != nil {
  529. log.Error("Failed to save node", "id", id, "error", err)
  530. }
  531. return err
  532. }
  533. return nil
  534. }
  535. // SetState updates the given node state flags and processes all resulting callbacks.
  536. // It only returns after all subsequent immediate changes (including those changed by the
  537. // callbacks) have been processed. If a flag with a timeout is set again, the operation
  538. // removes or replaces the existing timeout.
  539. func (ns *NodeStateMachine) SetState(n *enode.Node, setFlags, resetFlags Flags, timeout time.Duration) {
  540. ns.lock.Lock()
  541. ns.checkStarted()
  542. if ns.stopped {
  543. ns.lock.Unlock()
  544. return
  545. }
  546. set, reset := ns.stateMask(setFlags), ns.stateMask(resetFlags)
  547. id, node := ns.updateEnode(n)
  548. if node == nil {
  549. if set == 0 {
  550. ns.lock.Unlock()
  551. return
  552. }
  553. node = ns.newNode(n)
  554. ns.nodes[id] = node
  555. }
  556. oldState := node.state
  557. newState := (node.state & (^reset)) | set
  558. changed := oldState ^ newState
  559. node.state = newState
  560. // Remove the timeout callbacks for all reset and set flags,
  561. // even they are not existent(it's noop).
  562. ns.removeTimeouts(node, set|reset)
  563. // Register the timeout callback if the new state is not empty
  564. // and timeout itself is required.
  565. if timeout != 0 && newState != 0 {
  566. ns.addTimeout(n, set, timeout)
  567. }
  568. if newState == oldState {
  569. ns.lock.Unlock()
  570. return
  571. }
  572. if newState == 0 {
  573. delete(ns.nodes, id)
  574. if node.db {
  575. ns.deleteNode(id)
  576. }
  577. } else {
  578. if changed&ns.saveFlags != 0 {
  579. node.dirty = true
  580. }
  581. }
  582. ns.lock.Unlock()
  583. // call state update subscription callbacks without holding the mutex
  584. for _, sub := range ns.stateSubs {
  585. if changed&sub.mask != 0 {
  586. sub.callback(n, Flags{mask: oldState & sub.mask, setup: ns.setup}, Flags{mask: newState & sub.mask, setup: ns.setup})
  587. }
  588. }
  589. if newState == 0 {
  590. // call field subscriptions for discarded fields
  591. for i, v := range node.fields {
  592. if v != nil {
  593. f := ns.fields[i]
  594. if len(f.subs) > 0 {
  595. for _, cb := range f.subs {
  596. cb(n, Flags{setup: ns.setup}, v, nil)
  597. }
  598. }
  599. }
  600. }
  601. }
  602. }
  603. // offlineCallbacks calls state update callbacks at startup or shutdown
  604. func (ns *NodeStateMachine) offlineCallbacks(start bool) {
  605. for _, cb := range ns.offlineCallbackList {
  606. for _, sub := range ns.stateSubs {
  607. offState := offlineState & sub.mask
  608. onState := cb.state & sub.mask
  609. if offState != onState {
  610. if start {
  611. sub.callback(cb.node, Flags{mask: offState, setup: ns.setup}, Flags{mask: onState, setup: ns.setup})
  612. } else {
  613. sub.callback(cb.node, Flags{mask: onState, setup: ns.setup}, Flags{mask: offState, setup: ns.setup})
  614. }
  615. }
  616. }
  617. for i, f := range cb.fields {
  618. if f != nil && ns.fields[i].subs != nil {
  619. for _, fsub := range ns.fields[i].subs {
  620. if start {
  621. fsub(cb.node, Flags{mask: offlineState, setup: ns.setup}, nil, f)
  622. } else {
  623. fsub(cb.node, Flags{mask: offlineState, setup: ns.setup}, f, nil)
  624. }
  625. }
  626. }
  627. }
  628. }
  629. ns.offlineCallbackList = nil
  630. }
  631. // AddTimeout adds a node state timeout associated to the given state flag(s).
  632. // After the specified time interval, the relevant states will be reset.
  633. func (ns *NodeStateMachine) AddTimeout(n *enode.Node, flags Flags, timeout time.Duration) {
  634. ns.lock.Lock()
  635. defer ns.lock.Unlock()
  636. ns.checkStarted()
  637. if ns.stopped {
  638. return
  639. }
  640. ns.addTimeout(n, ns.stateMask(flags), timeout)
  641. }
  642. // addTimeout adds a node state timeout associated to the given state flag(s).
  643. func (ns *NodeStateMachine) addTimeout(n *enode.Node, mask bitMask, timeout time.Duration) {
  644. _, node := ns.updateEnode(n)
  645. if node == nil {
  646. return
  647. }
  648. mask &= node.state
  649. if mask == 0 {
  650. return
  651. }
  652. ns.removeTimeouts(node, mask)
  653. t := &nodeStateTimeout{mask: mask}
  654. t.timer = ns.clock.AfterFunc(timeout, func() {
  655. ns.SetState(n, Flags{}, Flags{mask: t.mask, setup: ns.setup}, 0)
  656. })
  657. node.timeouts = append(node.timeouts, t)
  658. if mask&ns.saveFlags != 0 {
  659. node.dirty = true
  660. }
  661. }
  662. // removeTimeout removes node state timeouts associated to the given state flag(s).
  663. // If a timeout was associated to multiple flags which are not all included in the
  664. // specified remove mask then only the included flags are de-associated and the timer
  665. // stays active.
  666. func (ns *NodeStateMachine) removeTimeouts(node *nodeInfo, mask bitMask) {
  667. for i := 0; i < len(node.timeouts); i++ {
  668. t := node.timeouts[i]
  669. match := t.mask & mask
  670. if match == 0 {
  671. continue
  672. }
  673. t.mask -= match
  674. if t.mask != 0 {
  675. continue
  676. }
  677. t.timer.Stop()
  678. node.timeouts[i] = node.timeouts[len(node.timeouts)-1]
  679. node.timeouts = node.timeouts[:len(node.timeouts)-1]
  680. i--
  681. if match&ns.saveFlags != 0 {
  682. node.dirty = true
  683. }
  684. }
  685. }
  686. // GetField retrieves the given field of the given node
  687. func (ns *NodeStateMachine) GetField(n *enode.Node, field Field) interface{} {
  688. ns.lock.Lock()
  689. defer ns.lock.Unlock()
  690. ns.checkStarted()
  691. if ns.stopped {
  692. return nil
  693. }
  694. if _, node := ns.updateEnode(n); node != nil {
  695. return node.fields[ns.fieldIndex(field)]
  696. }
  697. return nil
  698. }
  699. // SetField sets the given field of the given node
  700. func (ns *NodeStateMachine) SetField(n *enode.Node, field Field, value interface{}) error {
  701. ns.lock.Lock()
  702. ns.checkStarted()
  703. if ns.stopped {
  704. ns.lock.Unlock()
  705. return nil
  706. }
  707. _, node := ns.updateEnode(n)
  708. if node == nil {
  709. ns.lock.Unlock()
  710. return nil
  711. }
  712. fieldIndex := ns.fieldIndex(field)
  713. f := ns.fields[fieldIndex]
  714. if value != nil && reflect.TypeOf(value) != f.ftype {
  715. log.Error("Invalid field type", "type", reflect.TypeOf(value), "required", f.ftype)
  716. ns.lock.Unlock()
  717. return errors.New("invalid field type")
  718. }
  719. oldValue := node.fields[fieldIndex]
  720. if value == oldValue {
  721. ns.lock.Unlock()
  722. return nil
  723. }
  724. node.fields[fieldIndex] = value
  725. if f.encode != nil {
  726. node.dirty = true
  727. }
  728. state := node.state
  729. ns.lock.Unlock()
  730. if len(f.subs) > 0 {
  731. for _, cb := range f.subs {
  732. cb(n, Flags{mask: state, setup: ns.setup}, oldValue, value)
  733. }
  734. }
  735. return nil
  736. }
  737. // ForEach calls the callback for each node having all of the required and none of the
  738. // disabled flags set
  739. func (ns *NodeStateMachine) ForEach(requireFlags, disableFlags Flags, cb func(n *enode.Node, state Flags)) {
  740. ns.lock.Lock()
  741. ns.checkStarted()
  742. type callback struct {
  743. node *enode.Node
  744. state bitMask
  745. }
  746. require, disable := ns.stateMask(requireFlags), ns.stateMask(disableFlags)
  747. var callbacks []callback
  748. for _, node := range ns.nodes {
  749. if node.state&require == require && node.state&disable == 0 {
  750. callbacks = append(callbacks, callback{node.node, node.state & (require | disable)})
  751. }
  752. }
  753. ns.lock.Unlock()
  754. for _, c := range callbacks {
  755. cb(c.node, Flags{mask: c.state, setup: ns.setup})
  756. }
  757. }
  758. // GetNode returns the enode currently associated with the given ID
  759. func (ns *NodeStateMachine) GetNode(id enode.ID) *enode.Node {
  760. ns.lock.Lock()
  761. defer ns.lock.Unlock()
  762. ns.checkStarted()
  763. if node := ns.nodes[id]; node != nil {
  764. return node.node
  765. }
  766. return nil
  767. }
  768. // AddLogMetrics adds logging and/or metrics for nodes entering, exiting and currently
  769. // being in a given set specified by required and disabled state flags
  770. func (ns *NodeStateMachine) AddLogMetrics(requireFlags, disableFlags Flags, name string, inMeter, outMeter metrics.Meter, gauge metrics.Gauge) {
  771. var count int64
  772. ns.SubscribeState(requireFlags.Or(disableFlags), func(n *enode.Node, oldState, newState Flags) {
  773. oldMatch := oldState.HasAll(requireFlags) && oldState.HasNone(disableFlags)
  774. newMatch := newState.HasAll(requireFlags) && newState.HasNone(disableFlags)
  775. if newMatch == oldMatch {
  776. return
  777. }
  778. if newMatch {
  779. count++
  780. if name != "" {
  781. log.Debug("Node entered", "set", name, "id", n.ID(), "count", count)
  782. }
  783. if inMeter != nil {
  784. inMeter.Mark(1)
  785. }
  786. } else {
  787. count--
  788. if name != "" {
  789. log.Debug("Node left", "set", name, "id", n.ID(), "count", count)
  790. }
  791. if outMeter != nil {
  792. outMeter.Mark(1)
  793. }
  794. }
  795. if gauge != nil {
  796. gauge.Update(count)
  797. }
  798. })
  799. }