node.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package node represents the Ethereum protocol stack container.
  17. package node
  18. import (
  19. "errors"
  20. "os"
  21. "path/filepath"
  22. "reflect"
  23. "sync"
  24. "syscall"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. )
  28. var (
  29. ErrDatadirUsed = errors.New("datadir already used")
  30. ErrNodeStopped = errors.New("node not started")
  31. ErrNodeRunning = errors.New("node already running")
  32. ErrServiceUnknown = errors.New("service not registered")
  33. ErrServiceRegistered = errors.New("service already registered")
  34. datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
  35. )
  36. // Node represents a P2P node into which arbitrary services might be registered.
  37. type Node struct {
  38. datadir string // Path to the currently used data directory
  39. eventmux *event.TypeMux // Event multiplexer used between the services of a stack
  40. serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
  41. server *p2p.Server // Currently running P2P networking layer
  42. serviceIndex map[string]ServiceConstructor // Set of services currently registered in the node
  43. serviceOrder []string // Service construction order to handle dependencies
  44. services map[string]Service // Currently running services
  45. stop chan struct{} // Channel to wait for termination notifications
  46. lock sync.RWMutex
  47. }
  48. // New creates a new P2P node, ready for protocol registration.
  49. func New(conf *Config) (*Node, error) {
  50. // Ensure the data directory exists, failing if it cannot be created
  51. if conf.DataDir != "" {
  52. if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
  53. return nil, err
  54. }
  55. }
  56. // Assemble the networking layer and the node itself
  57. nodeDbPath := ""
  58. if conf.DataDir != "" {
  59. nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
  60. }
  61. return &Node{
  62. datadir: conf.DataDir,
  63. serverConfig: &p2p.Server{
  64. PrivateKey: conf.NodeKey(),
  65. Name: conf.Name,
  66. Discovery: !conf.NoDiscovery,
  67. BootstrapNodes: conf.BootstrapNodes,
  68. StaticNodes: conf.StaticNodes(),
  69. TrustedNodes: conf.TrusterNodes(),
  70. NodeDatabase: nodeDbPath,
  71. ListenAddr: conf.ListenAddr,
  72. NAT: conf.NAT,
  73. Dialer: conf.Dialer,
  74. NoDial: conf.NoDial,
  75. MaxPeers: conf.MaxPeers,
  76. MaxPendingPeers: conf.MaxPendingPeers,
  77. },
  78. serviceIndex: make(map[string]ServiceConstructor),
  79. serviceOrder: []string{},
  80. eventmux: new(event.TypeMux),
  81. }, nil
  82. }
  83. // Register injects a new service into the node's stack.
  84. func (n *Node) Register(id string, constructor ServiceConstructor) error {
  85. n.lock.Lock()
  86. defer n.lock.Unlock()
  87. // Short circuit if the node is running or if the id is taken
  88. if n.server != nil {
  89. return ErrNodeRunning
  90. }
  91. if _, ok := n.serviceIndex[id]; ok {
  92. return ErrServiceRegistered
  93. }
  94. // Otherwise register the service and return
  95. n.serviceOrder = append(n.serviceOrder, id)
  96. n.serviceIndex[id] = constructor
  97. return nil
  98. }
  99. // Unregister removes a service from a node's stack. If the node is currently
  100. // running, an error will be returned.
  101. func (n *Node) Unregister(id string) error {
  102. n.lock.Lock()
  103. defer n.lock.Unlock()
  104. // Short circuit if the node is running, or if the service is unknown
  105. if n.server != nil {
  106. return ErrNodeRunning
  107. }
  108. if _, ok := n.serviceIndex[id]; !ok {
  109. return ErrServiceUnknown
  110. }
  111. // Otherwise drop the service and return
  112. delete(n.serviceIndex, id)
  113. for i, service := range n.serviceOrder {
  114. if service == id {
  115. n.serviceOrder = append(n.serviceOrder[:i], n.serviceOrder[i+1:]...)
  116. break
  117. }
  118. }
  119. return nil
  120. }
  121. // Start create a live P2P node and starts running it.
  122. func (n *Node) Start() error {
  123. n.lock.Lock()
  124. defer n.lock.Unlock()
  125. // Short circuit if the node's already running
  126. if n.server != nil {
  127. return ErrNodeRunning
  128. }
  129. // Otherwise copy and specialize the P2P configuration
  130. running := new(p2p.Server)
  131. *running = *n.serverConfig
  132. services := make(map[string]Service)
  133. for _, id := range n.serviceOrder {
  134. constructor := n.serviceIndex[id]
  135. // Create a new context for the particular service
  136. ctx := &ServiceContext{
  137. datadir: n.datadir,
  138. services: make(map[string]Service),
  139. EventMux: n.eventmux,
  140. }
  141. for id, s := range services { // copy needed for threaded access
  142. ctx.services[id] = s
  143. }
  144. // Construct and save the service
  145. service, err := constructor(ctx)
  146. if err != nil {
  147. return err
  148. }
  149. services[id] = service
  150. }
  151. // Gather the protocols and start the freshly assembled P2P server
  152. for _, service := range services {
  153. running.Protocols = append(running.Protocols, service.Protocols()...)
  154. }
  155. if err := running.Start(); err != nil {
  156. if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
  157. return ErrDatadirUsed
  158. }
  159. return err
  160. }
  161. // Start each of the services
  162. started := []string{}
  163. for id, service := range services {
  164. // Start the next service, stopping all previous upon failure
  165. if err := service.Start(running); err != nil {
  166. for _, id := range started {
  167. services[id].Stop()
  168. }
  169. running.Stop()
  170. return err
  171. }
  172. // Mark the service started for potential cleanup
  173. started = append(started, id)
  174. }
  175. // Finish initializing the startup
  176. n.services = services
  177. n.server = running
  178. n.stop = make(chan struct{})
  179. return nil
  180. }
  181. // Stop terminates a running node along with all it's services. In the node was
  182. // not started, an error is returned.
  183. func (n *Node) Stop() error {
  184. n.lock.Lock()
  185. defer n.lock.Unlock()
  186. // Short circuit if the node's not running
  187. if n.server == nil {
  188. return ErrNodeStopped
  189. }
  190. // Otherwise terminate all the services and the P2P server too
  191. failure := &StopError{
  192. Services: make(map[string]error),
  193. }
  194. for id, service := range n.services {
  195. if err := service.Stop(); err != nil {
  196. failure.Services[id] = err
  197. }
  198. }
  199. n.server.Stop()
  200. n.services = nil
  201. n.server = nil
  202. close(n.stop)
  203. if len(failure.Services) > 0 {
  204. return failure
  205. }
  206. return nil
  207. }
  208. // Wait blocks the thread until the node is stopped. If the node is not running
  209. // at the time of invocation, the method immediately returns.
  210. func (n *Node) Wait() {
  211. n.lock.RLock()
  212. if n.server == nil {
  213. return
  214. }
  215. stop := n.stop
  216. n.lock.RUnlock()
  217. <-stop
  218. }
  219. // Restart terminates a running node and boots up a new one in its place. If the
  220. // node isn't running, an error is returned.
  221. func (n *Node) Restart() error {
  222. if err := n.Stop(); err != nil {
  223. return err
  224. }
  225. if err := n.Start(); err != nil {
  226. return err
  227. }
  228. return nil
  229. }
  230. // Server retrieves the currently running P2P network layer. This method is meant
  231. // only to inspect fields of the currently running server, life cycle management
  232. // should be left to this Node entity.
  233. func (n *Node) Server() *p2p.Server {
  234. n.lock.RLock()
  235. defer n.lock.RUnlock()
  236. return n.server
  237. }
  238. // Service retrieves a currently running service registered under a given id.
  239. func (n *Node) Service(id string) Service {
  240. n.lock.RLock()
  241. defer n.lock.RUnlock()
  242. // Short circuit if the node's not running
  243. if n.server == nil {
  244. return nil
  245. }
  246. return n.services[id]
  247. }
  248. // SingletonService retrieves a currently running service using a specific type
  249. // implementing the Service interface. This is a utility function for scenarios
  250. // where it is known that only one instance of a given service type is running,
  251. // allowing to access services without needing to know their specific id with
  252. // which they were registered. Note, this method uses reflection, so do not run
  253. // in a tight loop.
  254. func (n *Node) SingletonService(service interface{}) (string, error) {
  255. n.lock.RLock()
  256. defer n.lock.RUnlock()
  257. // Short circuit if the node's not running
  258. if n.server == nil {
  259. return "", ErrServiceUnknown
  260. }
  261. // Otherwise try to find the service to return
  262. for id, running := range n.services {
  263. if reflect.TypeOf(running) == reflect.ValueOf(service).Elem().Type() {
  264. reflect.ValueOf(service).Elem().Set(reflect.ValueOf(running))
  265. return id, nil
  266. }
  267. }
  268. return "", ErrServiceUnknown
  269. }
  270. // DataDir retrieves the current datadir used by the protocol stack.
  271. func (n *Node) DataDir() string {
  272. return n.datadir
  273. }
  274. // EventMux retrieves the event multiplexer used by all the network services in
  275. // the current protocol stack.
  276. func (n *Node) EventMux() *event.TypeMux {
  277. return n.eventmux
  278. }