node.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package node represents the Ethereum protocol stack container.
  17. package node
  18. import (
  19. "errors"
  20. "os"
  21. "path/filepath"
  22. "reflect"
  23. "sync"
  24. "syscall"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. rpc "github.com/ethereum/go-ethereum/rpc/v2"
  28. )
  29. var (
  30. ErrDatadirUsed = errors.New("datadir already used")
  31. ErrNodeStopped = errors.New("node not started")
  32. ErrNodeRunning = errors.New("node already running")
  33. ErrServiceUnknown = errors.New("unknown service")
  34. datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
  35. )
  36. // Node represents a P2P node into which arbitrary (uniquely typed) services might
  37. // be registered.
  38. type Node struct {
  39. datadir string // Path to the currently used data directory
  40. eventmux *event.TypeMux // Event multiplexer used between the services of a stack
  41. serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
  42. server *p2p.Server // Currently running P2P networking layer
  43. serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
  44. services map[reflect.Type]Service // Currently running services
  45. stop chan struct{} // Channel to wait for termination notifications
  46. lock sync.RWMutex
  47. }
  48. // New creates a new P2P node, ready for protocol registration.
  49. func New(conf *Config) (*Node, error) {
  50. // Ensure the data directory exists, failing if it cannot be created
  51. if conf.DataDir != "" {
  52. if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
  53. return nil, err
  54. }
  55. }
  56. // Assemble the networking layer and the node itself
  57. nodeDbPath := ""
  58. if conf.DataDir != "" {
  59. nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
  60. }
  61. return &Node{
  62. datadir: conf.DataDir,
  63. serverConfig: &p2p.Server{
  64. PrivateKey: conf.NodeKey(),
  65. Name: conf.Name,
  66. Discovery: !conf.NoDiscovery,
  67. BootstrapNodes: conf.BootstrapNodes,
  68. StaticNodes: conf.StaticNodes(),
  69. TrustedNodes: conf.TrusterNodes(),
  70. NodeDatabase: nodeDbPath,
  71. ListenAddr: conf.ListenAddr,
  72. NAT: conf.NAT,
  73. Dialer: conf.Dialer,
  74. NoDial: conf.NoDial,
  75. MaxPeers: conf.MaxPeers,
  76. MaxPendingPeers: conf.MaxPendingPeers,
  77. },
  78. serviceFuncs: []ServiceConstructor{},
  79. eventmux: new(event.TypeMux),
  80. }, nil
  81. }
  82. // Register injects a new service into the node's stack. The service created by
  83. // the passed constructor must be unique in its type with regard to sibling ones.
  84. func (n *Node) Register(constructor ServiceConstructor) error {
  85. n.lock.Lock()
  86. defer n.lock.Unlock()
  87. if n.server != nil {
  88. return ErrNodeRunning
  89. }
  90. n.serviceFuncs = append(n.serviceFuncs, constructor)
  91. return nil
  92. }
  93. // Start create a live P2P node and starts running it.
  94. func (n *Node) Start() error {
  95. n.lock.Lock()
  96. defer n.lock.Unlock()
  97. // Short circuit if the node's already running
  98. if n.server != nil {
  99. return ErrNodeRunning
  100. }
  101. // Otherwise copy and specialize the P2P configuration
  102. running := new(p2p.Server)
  103. *running = *n.serverConfig
  104. services := make(map[reflect.Type]Service)
  105. for _, constructor := range n.serviceFuncs {
  106. // Create a new context for the particular service
  107. ctx := &ServiceContext{
  108. datadir: n.datadir,
  109. services: make(map[reflect.Type]Service),
  110. EventMux: n.eventmux,
  111. }
  112. for kind, s := range services { // copy needed for threaded access
  113. ctx.services[kind] = s
  114. }
  115. // Construct and save the service
  116. service, err := constructor(ctx)
  117. if err != nil {
  118. return err
  119. }
  120. kind := reflect.TypeOf(service)
  121. if _, exists := services[kind]; exists {
  122. return &DuplicateServiceError{Kind: kind}
  123. }
  124. services[kind] = service
  125. }
  126. // Gather the protocols and start the freshly assembled P2P server
  127. for _, service := range services {
  128. running.Protocols = append(running.Protocols, service.Protocols()...)
  129. }
  130. if err := running.Start(); err != nil {
  131. if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
  132. return ErrDatadirUsed
  133. }
  134. return err
  135. }
  136. // Start each of the services
  137. started := []reflect.Type{}
  138. for kind, service := range services {
  139. // Start the next service, stopping all previous upon failure
  140. if err := service.Start(running); err != nil {
  141. for _, kind := range started {
  142. services[kind].Stop()
  143. }
  144. running.Stop()
  145. return err
  146. }
  147. // Mark the service started for potential cleanup
  148. started = append(started, kind)
  149. }
  150. // Finish initializing the startup
  151. n.services = services
  152. n.server = running
  153. n.stop = make(chan struct{})
  154. return nil
  155. }
  156. // Stop terminates a running node along with all it's services. In the node was
  157. // not started, an error is returned.
  158. func (n *Node) Stop() error {
  159. n.lock.Lock()
  160. defer n.lock.Unlock()
  161. // Short circuit if the node's not running
  162. if n.server == nil {
  163. return ErrNodeStopped
  164. }
  165. // Otherwise terminate all the services and the P2P server too
  166. failure := &StopError{
  167. Services: make(map[reflect.Type]error),
  168. }
  169. for kind, service := range n.services {
  170. if err := service.Stop(); err != nil {
  171. failure.Services[kind] = err
  172. }
  173. }
  174. n.server.Stop()
  175. n.services = nil
  176. n.server = nil
  177. close(n.stop)
  178. if len(failure.Services) > 0 {
  179. return failure
  180. }
  181. return nil
  182. }
  183. // Wait blocks the thread until the node is stopped. If the node is not running
  184. // at the time of invocation, the method immediately returns.
  185. func (n *Node) Wait() {
  186. n.lock.RLock()
  187. if n.server == nil {
  188. return
  189. }
  190. stop := n.stop
  191. n.lock.RUnlock()
  192. <-stop
  193. }
  194. // Restart terminates a running node and boots up a new one in its place. If the
  195. // node isn't running, an error is returned.
  196. func (n *Node) Restart() error {
  197. if err := n.Stop(); err != nil {
  198. return err
  199. }
  200. if err := n.Start(); err != nil {
  201. return err
  202. }
  203. return nil
  204. }
  205. // Server retrieves the currently running P2P network layer. This method is meant
  206. // only to inspect fields of the currently running server, life cycle management
  207. // should be left to this Node entity.
  208. func (n *Node) Server() *p2p.Server {
  209. n.lock.RLock()
  210. defer n.lock.RUnlock()
  211. return n.server
  212. }
  213. // Service retrieves a currently running service registered of a specific type.
  214. func (n *Node) Service(service interface{}) error {
  215. n.lock.RLock()
  216. defer n.lock.RUnlock()
  217. // Short circuit if the node's not running
  218. if n.server == nil {
  219. return ErrNodeStopped
  220. }
  221. // Otherwise try to find the service to return
  222. element := reflect.ValueOf(service).Elem()
  223. if running, ok := n.services[element.Type()]; ok {
  224. element.Set(reflect.ValueOf(running))
  225. return nil
  226. }
  227. return ErrServiceUnknown
  228. }
  229. // DataDir retrieves the current datadir used by the protocol stack.
  230. func (n *Node) DataDir() string {
  231. return n.datadir
  232. }
  233. // EventMux retrieves the event multiplexer used by all the network services in
  234. // the current protocol stack.
  235. func (n *Node) EventMux() *event.TypeMux {
  236. return n.eventmux
  237. }
  238. // RPCAPIs returns the collection of RPC descriptor this node offers
  239. func (n *Node) RPCAPIs() []rpc.API {
  240. var apis []rpc.API
  241. for _, api := range n.services {
  242. apis = append(apis, api.APIs()...)
  243. }
  244. return apis
  245. }