node.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package node represents the Ethereum protocol stack container.
  17. package node
  18. import (
  19. "errors"
  20. "os"
  21. "path/filepath"
  22. "reflect"
  23. "sync"
  24. "syscall"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/internal/debug"
  27. "github.com/ethereum/go-ethereum/p2p"
  28. "github.com/ethereum/go-ethereum/rpc"
  29. )
  30. var (
  31. ErrDatadirUsed = errors.New("datadir already used")
  32. ErrNodeStopped = errors.New("node not started")
  33. ErrNodeRunning = errors.New("node already running")
  34. ErrServiceUnknown = errors.New("unknown service")
  35. datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
  36. )
  37. // Node represents a P2P node into which arbitrary (uniquely typed) services might
  38. // be registered.
  39. type Node struct {
  40. datadir string // Path to the currently used data directory
  41. eventmux *event.TypeMux // Event multiplexer used between the services of a stack
  42. serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
  43. server *p2p.Server // Currently running P2P networking layer
  44. serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
  45. services map[reflect.Type]Service // Currently running services
  46. stop chan struct{} // Channel to wait for termination notifications
  47. lock sync.RWMutex
  48. }
  49. // New creates a new P2P node, ready for protocol registration.
  50. func New(conf *Config) (*Node, error) {
  51. // Ensure the data directory exists, failing if it cannot be created
  52. if conf.DataDir != "" {
  53. if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
  54. return nil, err
  55. }
  56. }
  57. // Assemble the networking layer and the node itself
  58. nodeDbPath := ""
  59. if conf.DataDir != "" {
  60. nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
  61. }
  62. return &Node{
  63. datadir: conf.DataDir,
  64. serverConfig: &p2p.Server{
  65. PrivateKey: conf.NodeKey(),
  66. Name: conf.Name,
  67. Discovery: !conf.NoDiscovery,
  68. BootstrapNodes: conf.BootstrapNodes,
  69. StaticNodes: conf.StaticNodes(),
  70. TrustedNodes: conf.TrusterNodes(),
  71. NodeDatabase: nodeDbPath,
  72. ListenAddr: conf.ListenAddr,
  73. NAT: conf.NAT,
  74. Dialer: conf.Dialer,
  75. NoDial: conf.NoDial,
  76. MaxPeers: conf.MaxPeers,
  77. MaxPendingPeers: conf.MaxPendingPeers,
  78. },
  79. serviceFuncs: []ServiceConstructor{},
  80. eventmux: new(event.TypeMux),
  81. }, nil
  82. }
  83. // Register injects a new service into the node's stack. The service created by
  84. // the passed constructor must be unique in its type with regard to sibling ones.
  85. func (n *Node) Register(constructor ServiceConstructor) error {
  86. n.lock.Lock()
  87. defer n.lock.Unlock()
  88. if n.server != nil {
  89. return ErrNodeRunning
  90. }
  91. n.serviceFuncs = append(n.serviceFuncs, constructor)
  92. return nil
  93. }
  94. // Start create a live P2P node and starts running it.
  95. func (n *Node) Start() error {
  96. n.lock.Lock()
  97. defer n.lock.Unlock()
  98. // Short circuit if the node's already running
  99. if n.server != nil {
  100. return ErrNodeRunning
  101. }
  102. // Otherwise copy and specialize the P2P configuration
  103. running := new(p2p.Server)
  104. *running = *n.serverConfig
  105. services := make(map[reflect.Type]Service)
  106. for _, constructor := range n.serviceFuncs {
  107. // Create a new context for the particular service
  108. ctx := &ServiceContext{
  109. datadir: n.datadir,
  110. services: make(map[reflect.Type]Service),
  111. EventMux: n.eventmux,
  112. }
  113. for kind, s := range services { // copy needed for threaded access
  114. ctx.services[kind] = s
  115. }
  116. // Construct and save the service
  117. service, err := constructor(ctx)
  118. if err != nil {
  119. return err
  120. }
  121. kind := reflect.TypeOf(service)
  122. if _, exists := services[kind]; exists {
  123. return &DuplicateServiceError{Kind: kind}
  124. }
  125. services[kind] = service
  126. }
  127. // Gather the protocols and start the freshly assembled P2P server
  128. for _, service := range services {
  129. running.Protocols = append(running.Protocols, service.Protocols()...)
  130. }
  131. if err := running.Start(); err != nil {
  132. if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
  133. return ErrDatadirUsed
  134. }
  135. return err
  136. }
  137. // Start each of the services
  138. started := []reflect.Type{}
  139. for kind, service := range services {
  140. // Start the next service, stopping all previous upon failure
  141. if err := service.Start(running); err != nil {
  142. for _, kind := range started {
  143. services[kind].Stop()
  144. }
  145. running.Stop()
  146. return err
  147. }
  148. // Mark the service started for potential cleanup
  149. started = append(started, kind)
  150. }
  151. // Finish initializing the startup
  152. n.services = services
  153. n.server = running
  154. n.stop = make(chan struct{})
  155. return nil
  156. }
  157. // Stop terminates a running node along with all it's services. In the node was
  158. // not started, an error is returned.
  159. func (n *Node) Stop() error {
  160. n.lock.Lock()
  161. defer n.lock.Unlock()
  162. // Short circuit if the node's not running
  163. if n.server == nil {
  164. return ErrNodeStopped
  165. }
  166. // Otherwise terminate all the services and the P2P server too
  167. failure := &StopError{
  168. Services: make(map[reflect.Type]error),
  169. }
  170. for kind, service := range n.services {
  171. if err := service.Stop(); err != nil {
  172. failure.Services[kind] = err
  173. }
  174. }
  175. n.server.Stop()
  176. n.services = nil
  177. n.server = nil
  178. close(n.stop)
  179. if len(failure.Services) > 0 {
  180. return failure
  181. }
  182. return nil
  183. }
  184. // Wait blocks the thread until the node is stopped. If the node is not running
  185. // at the time of invocation, the method immediately returns.
  186. func (n *Node) Wait() {
  187. n.lock.RLock()
  188. if n.server == nil {
  189. return
  190. }
  191. stop := n.stop
  192. n.lock.RUnlock()
  193. <-stop
  194. }
  195. // Restart terminates a running node and boots up a new one in its place. If the
  196. // node isn't running, an error is returned.
  197. func (n *Node) Restart() error {
  198. if err := n.Stop(); err != nil {
  199. return err
  200. }
  201. if err := n.Start(); err != nil {
  202. return err
  203. }
  204. return nil
  205. }
  206. // Server retrieves the currently running P2P network layer. This method is meant
  207. // only to inspect fields of the currently running server, life cycle management
  208. // should be left to this Node entity.
  209. func (n *Node) Server() *p2p.Server {
  210. n.lock.RLock()
  211. defer n.lock.RUnlock()
  212. return n.server
  213. }
  214. // Service retrieves a currently running service registered of a specific type.
  215. func (n *Node) Service(service interface{}) error {
  216. n.lock.RLock()
  217. defer n.lock.RUnlock()
  218. // Short circuit if the node's not running
  219. if n.server == nil {
  220. return ErrNodeStopped
  221. }
  222. // Otherwise try to find the service to return
  223. element := reflect.ValueOf(service).Elem()
  224. if running, ok := n.services[element.Type()]; ok {
  225. element.Set(reflect.ValueOf(running))
  226. return nil
  227. }
  228. return ErrServiceUnknown
  229. }
  230. // DataDir retrieves the current datadir used by the protocol stack.
  231. func (n *Node) DataDir() string {
  232. return n.datadir
  233. }
  234. // EventMux retrieves the event multiplexer used by all the network services in
  235. // the current protocol stack.
  236. func (n *Node) EventMux() *event.TypeMux {
  237. return n.eventmux
  238. }
  239. // APIs returns the collection of RPC descriptor this node offers. This method
  240. // is just a quick placeholder passthrough for the RPC update, which in the next
  241. // step will be fully integrated into the node itself.
  242. func (n *Node) APIs() []rpc.API {
  243. // Define all the APIs owned by the node itself
  244. apis := []rpc.API{
  245. {
  246. Namespace: "admin",
  247. Version: "1.0",
  248. Service: NewPrivateAdminAPI(n),
  249. }, {
  250. Namespace: "admin",
  251. Version: "1.0",
  252. Service: NewPublicAdminAPI(n),
  253. Public: true,
  254. }, {
  255. Namespace: "debug",
  256. Version: "1.0",
  257. Service: debug.Handler,
  258. }, {
  259. Namespace: "debug",
  260. Version: "1.0",
  261. Service: NewPublicDebugAPI(n),
  262. Public: true,
  263. }, {
  264. Namespace: "web3",
  265. Version: "1.0",
  266. Service: NewPublicWeb3API(n),
  267. Public: true,
  268. },
  269. }
  270. // Inject all the APIs owned by various services
  271. for _, api := range n.services {
  272. apis = append(apis, api.APIs()...)
  273. }
  274. return apis
  275. }