node.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package node represents the Ethereum protocol stack container.
  17. package node
  18. import (
  19. "errors"
  20. "os"
  21. "path/filepath"
  22. "reflect"
  23. "sync"
  24. "syscall"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/p2p"
  27. )
  28. var (
  29. ErrDatadirUsed = errors.New("datadir already used")
  30. ErrNodeStopped = errors.New("node not started")
  31. ErrNodeRunning = errors.New("node already running")
  32. ErrServiceUnknown = errors.New("unknown service")
  33. datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
  34. )
  35. // Node represents a P2P node into which arbitrary (uniquely typed) services might
  36. // be registered.
  37. type Node struct {
  38. datadir string // Path to the currently used data directory
  39. eventmux *event.TypeMux // Event multiplexer used between the services of a stack
  40. serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
  41. server *p2p.Server // Currently running P2P networking layer
  42. serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
  43. services map[reflect.Type]Service // Currently running services
  44. stop chan struct{} // Channel to wait for termination notifications
  45. lock sync.RWMutex
  46. }
  47. // New creates a new P2P node, ready for protocol registration.
  48. func New(conf *Config) (*Node, error) {
  49. // Ensure the data directory exists, failing if it cannot be created
  50. if conf.DataDir != "" {
  51. if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
  52. return nil, err
  53. }
  54. }
  55. // Assemble the networking layer and the node itself
  56. nodeDbPath := ""
  57. if conf.DataDir != "" {
  58. nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
  59. }
  60. return &Node{
  61. datadir: conf.DataDir,
  62. serverConfig: &p2p.Server{
  63. PrivateKey: conf.NodeKey(),
  64. Name: conf.Name,
  65. Discovery: !conf.NoDiscovery,
  66. BootstrapNodes: conf.BootstrapNodes,
  67. StaticNodes: conf.StaticNodes(),
  68. TrustedNodes: conf.TrusterNodes(),
  69. NodeDatabase: nodeDbPath,
  70. ListenAddr: conf.ListenAddr,
  71. NAT: conf.NAT,
  72. Dialer: conf.Dialer,
  73. NoDial: conf.NoDial,
  74. MaxPeers: conf.MaxPeers,
  75. MaxPendingPeers: conf.MaxPendingPeers,
  76. },
  77. serviceFuncs: []ServiceConstructor{},
  78. eventmux: new(event.TypeMux),
  79. }, nil
  80. }
  81. // Register injects a new service into the node's stack. The service created by
  82. // the passed constructor must be unique in its type with regard to sibling ones.
  83. func (n *Node) Register(constructor ServiceConstructor) error {
  84. n.lock.Lock()
  85. defer n.lock.Unlock()
  86. if n.server != nil {
  87. return ErrNodeRunning
  88. }
  89. n.serviceFuncs = append(n.serviceFuncs, constructor)
  90. return nil
  91. }
  92. // Start create a live P2P node and starts running it.
  93. func (n *Node) Start() error {
  94. n.lock.Lock()
  95. defer n.lock.Unlock()
  96. // Short circuit if the node's already running
  97. if n.server != nil {
  98. return ErrNodeRunning
  99. }
  100. // Otherwise copy and specialize the P2P configuration
  101. running := new(p2p.Server)
  102. *running = *n.serverConfig
  103. services := make(map[reflect.Type]Service)
  104. for _, constructor := range n.serviceFuncs {
  105. // Create a new context for the particular service
  106. ctx := &ServiceContext{
  107. datadir: n.datadir,
  108. services: make(map[reflect.Type]Service),
  109. EventMux: n.eventmux,
  110. }
  111. for kind, s := range services { // copy needed for threaded access
  112. ctx.services[kind] = s
  113. }
  114. // Construct and save the service
  115. service, err := constructor(ctx)
  116. if err != nil {
  117. return err
  118. }
  119. kind := reflect.TypeOf(service)
  120. if _, exists := services[kind]; exists {
  121. return &DuplicateServiceError{Kind: kind}
  122. }
  123. services[kind] = service
  124. }
  125. // Gather the protocols and start the freshly assembled P2P server
  126. for _, service := range services {
  127. running.Protocols = append(running.Protocols, service.Protocols()...)
  128. }
  129. if err := running.Start(); err != nil {
  130. if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
  131. return ErrDatadirUsed
  132. }
  133. return err
  134. }
  135. // Start each of the services
  136. started := []reflect.Type{}
  137. for kind, service := range services {
  138. // Start the next service, stopping all previous upon failure
  139. if err := service.Start(running); err != nil {
  140. for _, kind := range started {
  141. services[kind].Stop()
  142. }
  143. running.Stop()
  144. return err
  145. }
  146. // Mark the service started for potential cleanup
  147. started = append(started, kind)
  148. }
  149. // Finish initializing the startup
  150. n.services = services
  151. n.server = running
  152. n.stop = make(chan struct{})
  153. return nil
  154. }
  155. // Stop terminates a running node along with all it's services. In the node was
  156. // not started, an error is returned.
  157. func (n *Node) Stop() error {
  158. n.lock.Lock()
  159. defer n.lock.Unlock()
  160. // Short circuit if the node's not running
  161. if n.server == nil {
  162. return ErrNodeStopped
  163. }
  164. // Otherwise terminate all the services and the P2P server too
  165. failure := &StopError{
  166. Services: make(map[reflect.Type]error),
  167. }
  168. for kind, service := range n.services {
  169. if err := service.Stop(); err != nil {
  170. failure.Services[kind] = err
  171. }
  172. }
  173. n.server.Stop()
  174. n.services = nil
  175. n.server = nil
  176. close(n.stop)
  177. if len(failure.Services) > 0 {
  178. return failure
  179. }
  180. return nil
  181. }
  182. // Wait blocks the thread until the node is stopped. If the node is not running
  183. // at the time of invocation, the method immediately returns.
  184. func (n *Node) Wait() {
  185. n.lock.RLock()
  186. if n.server == nil {
  187. return
  188. }
  189. stop := n.stop
  190. n.lock.RUnlock()
  191. <-stop
  192. }
  193. // Restart terminates a running node and boots up a new one in its place. If the
  194. // node isn't running, an error is returned.
  195. func (n *Node) Restart() error {
  196. if err := n.Stop(); err != nil {
  197. return err
  198. }
  199. if err := n.Start(); err != nil {
  200. return err
  201. }
  202. return nil
  203. }
  204. // Server retrieves the currently running P2P network layer. This method is meant
  205. // only to inspect fields of the currently running server, life cycle management
  206. // should be left to this Node entity.
  207. func (n *Node) Server() *p2p.Server {
  208. n.lock.RLock()
  209. defer n.lock.RUnlock()
  210. return n.server
  211. }
  212. // Service retrieves a currently running service registered of a specific type.
  213. func (n *Node) Service(service interface{}) error {
  214. n.lock.RLock()
  215. defer n.lock.RUnlock()
  216. // Short circuit if the node's not running
  217. if n.server == nil {
  218. return ErrNodeStopped
  219. }
  220. // Otherwise try to find the service to return
  221. element := reflect.ValueOf(service).Elem()
  222. if running, ok := n.services[element.Type()]; ok {
  223. element.Set(reflect.ValueOf(running))
  224. return nil
  225. }
  226. return ErrServiceUnknown
  227. }
  228. // DataDir retrieves the current datadir used by the protocol stack.
  229. func (n *Node) DataDir() string {
  230. return n.datadir
  231. }
  232. // EventMux retrieves the event multiplexer used by all the network services in
  233. // the current protocol stack.
  234. func (n *Node) EventMux() *event.TypeMux {
  235. return n.eventmux
  236. }