node.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package node represents the Ethereum protocol stack container.
  17. package node
  18. import (
  19. "errors"
  20. "net"
  21. "os"
  22. "path/filepath"
  23. "reflect"
  24. "sync"
  25. "syscall"
  26. "github.com/ethereum/go-ethereum/event"
  27. "github.com/ethereum/go-ethereum/internal/debug"
  28. "github.com/ethereum/go-ethereum/logger"
  29. "github.com/ethereum/go-ethereum/logger/glog"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/rpc"
  32. )
  33. var (
  34. ErrDatadirUsed = errors.New("datadir already used")
  35. ErrNodeStopped = errors.New("node not started")
  36. ErrNodeRunning = errors.New("node already running")
  37. ErrServiceUnknown = errors.New("unknown service")
  38. datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
  39. )
  40. // Node represents a P2P node into which arbitrary (uniquely typed) services might
  41. // be registered.
  42. type Node struct {
  43. datadir string // Path to the currently used data directory
  44. eventmux *event.TypeMux // Event multiplexer used between the services of a stack
  45. serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
  46. server *p2p.Server // Currently running P2P networking layer
  47. serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
  48. services map[reflect.Type]Service // Currently running services
  49. ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled)
  50. ipcListener net.Listener // IPC RPC listener socket to serve API requests
  51. ipcHandler *rpc.Server // IPC RPC request handler to process the API requests
  52. stop chan struct{} // Channel to wait for termination notifications
  53. lock sync.RWMutex
  54. }
  55. // New creates a new P2P node, ready for protocol registration.
  56. func New(conf *Config) (*Node, error) {
  57. // Ensure the data directory exists, failing if it cannot be created
  58. if conf.DataDir != "" {
  59. if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
  60. return nil, err
  61. }
  62. }
  63. // Assemble the networking layer and the node itself
  64. nodeDbPath := ""
  65. if conf.DataDir != "" {
  66. nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
  67. }
  68. return &Node{
  69. datadir: conf.DataDir,
  70. serverConfig: &p2p.Server{
  71. PrivateKey: conf.NodeKey(),
  72. Name: conf.Name,
  73. Discovery: !conf.NoDiscovery,
  74. BootstrapNodes: conf.BootstrapNodes,
  75. StaticNodes: conf.StaticNodes(),
  76. TrustedNodes: conf.TrusterNodes(),
  77. NodeDatabase: nodeDbPath,
  78. ListenAddr: conf.ListenAddr,
  79. NAT: conf.NAT,
  80. Dialer: conf.Dialer,
  81. NoDial: conf.NoDial,
  82. MaxPeers: conf.MaxPeers,
  83. MaxPendingPeers: conf.MaxPendingPeers,
  84. },
  85. serviceFuncs: []ServiceConstructor{},
  86. ipcEndpoint: conf.IpcEndpoint(),
  87. eventmux: new(event.TypeMux),
  88. }, nil
  89. }
  90. // Register injects a new service into the node's stack. The service created by
  91. // the passed constructor must be unique in its type with regard to sibling ones.
  92. func (n *Node) Register(constructor ServiceConstructor) error {
  93. n.lock.Lock()
  94. defer n.lock.Unlock()
  95. if n.server != nil {
  96. return ErrNodeRunning
  97. }
  98. n.serviceFuncs = append(n.serviceFuncs, constructor)
  99. return nil
  100. }
  101. // Start create a live P2P node and starts running it.
  102. func (n *Node) Start() error {
  103. n.lock.Lock()
  104. defer n.lock.Unlock()
  105. // Short circuit if the node's already running
  106. if n.server != nil {
  107. return ErrNodeRunning
  108. }
  109. // Otherwise copy and specialize the P2P configuration
  110. running := new(p2p.Server)
  111. *running = *n.serverConfig
  112. services := make(map[reflect.Type]Service)
  113. for _, constructor := range n.serviceFuncs {
  114. // Create a new context for the particular service
  115. ctx := &ServiceContext{
  116. datadir: n.datadir,
  117. services: make(map[reflect.Type]Service),
  118. EventMux: n.eventmux,
  119. }
  120. for kind, s := range services { // copy needed for threaded access
  121. ctx.services[kind] = s
  122. }
  123. // Construct and save the service
  124. service, err := constructor(ctx)
  125. if err != nil {
  126. return err
  127. }
  128. kind := reflect.TypeOf(service)
  129. if _, exists := services[kind]; exists {
  130. return &DuplicateServiceError{Kind: kind}
  131. }
  132. services[kind] = service
  133. }
  134. // Gather the protocols and start the freshly assembled P2P server
  135. for _, service := range services {
  136. running.Protocols = append(running.Protocols, service.Protocols()...)
  137. }
  138. if err := running.Start(); err != nil {
  139. if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
  140. return ErrDatadirUsed
  141. }
  142. return err
  143. }
  144. // Start each of the services
  145. started := []reflect.Type{}
  146. for kind, service := range services {
  147. // Start the next service, stopping all previous upon failure
  148. if err := service.Start(running); err != nil {
  149. for _, kind := range started {
  150. services[kind].Stop()
  151. }
  152. running.Stop()
  153. return err
  154. }
  155. // Mark the service started for potential cleanup
  156. started = append(started, kind)
  157. }
  158. // Lastly start the configured RPC interfaces
  159. if err := n.startRPC(services); err != nil {
  160. for _, service := range services {
  161. service.Stop()
  162. }
  163. running.Stop()
  164. return err
  165. }
  166. // Finish initializing the startup
  167. n.services = services
  168. n.server = running
  169. n.stop = make(chan struct{})
  170. return nil
  171. }
  172. // startRPC initializes and starts the IPC RPC endpoints.
  173. func (n *Node) startRPC(services map[reflect.Type]Service) error {
  174. // Gather and register all the APIs exposed by the services
  175. apis := n.apis()
  176. for _, service := range services {
  177. apis = append(apis, service.APIs()...)
  178. }
  179. ipcHandler := rpc.NewServer()
  180. for _, api := range apis {
  181. if err := ipcHandler.RegisterName(api.Namespace, api.Service); err != nil {
  182. return err
  183. }
  184. glog.V(logger.Debug).Infof("Register %T under namespace '%s'", api.Service, api.Namespace)
  185. }
  186. // All APIs registered, start the IPC and HTTP listeners
  187. var (
  188. ipcListener net.Listener
  189. err error
  190. )
  191. if n.ipcEndpoint != "" {
  192. if ipcListener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil {
  193. return err
  194. }
  195. go func() {
  196. glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint)
  197. defer glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint)
  198. for {
  199. conn, err := ipcListener.Accept()
  200. if err != nil {
  201. // Terminate if the listener was closed
  202. n.lock.RLock()
  203. closed := n.ipcListener == nil
  204. n.lock.RUnlock()
  205. if closed {
  206. return
  207. }
  208. // Not closed, just some error; report and continue
  209. glog.V(logger.Error).Infof("IPC accept failed: %v", err)
  210. continue
  211. }
  212. go ipcHandler.ServeCodec(rpc.NewJSONCodec(conn))
  213. }
  214. }()
  215. }
  216. // All listeners booted successfully
  217. n.ipcListener = ipcListener
  218. n.ipcHandler = ipcHandler
  219. return nil
  220. }
  221. // Stop terminates a running node along with all it's services. In the node was
  222. // not started, an error is returned.
  223. func (n *Node) Stop() error {
  224. n.lock.Lock()
  225. defer n.lock.Unlock()
  226. // Short circuit if the node's not running
  227. if n.server == nil {
  228. return ErrNodeStopped
  229. }
  230. // Otherwise terminate the API, all services and the P2P server too
  231. if n.ipcListener != nil {
  232. n.ipcListener.Close()
  233. n.ipcListener = nil
  234. }
  235. if n.ipcHandler != nil {
  236. n.ipcHandler.Stop()
  237. n.ipcHandler = nil
  238. }
  239. failure := &StopError{
  240. Services: make(map[reflect.Type]error),
  241. }
  242. for kind, service := range n.services {
  243. if err := service.Stop(); err != nil {
  244. failure.Services[kind] = err
  245. }
  246. }
  247. n.server.Stop()
  248. n.services = nil
  249. n.server = nil
  250. close(n.stop)
  251. if len(failure.Services) > 0 {
  252. return failure
  253. }
  254. return nil
  255. }
  256. // Wait blocks the thread until the node is stopped. If the node is not running
  257. // at the time of invocation, the method immediately returns.
  258. func (n *Node) Wait() {
  259. n.lock.RLock()
  260. if n.server == nil {
  261. return
  262. }
  263. stop := n.stop
  264. n.lock.RUnlock()
  265. <-stop
  266. }
  267. // Restart terminates a running node and boots up a new one in its place. If the
  268. // node isn't running, an error is returned.
  269. func (n *Node) Restart() error {
  270. if err := n.Stop(); err != nil {
  271. return err
  272. }
  273. if err := n.Start(); err != nil {
  274. return err
  275. }
  276. return nil
  277. }
  278. // Server retrieves the currently running P2P network layer. This method is meant
  279. // only to inspect fields of the currently running server, life cycle management
  280. // should be left to this Node entity.
  281. func (n *Node) Server() *p2p.Server {
  282. n.lock.RLock()
  283. defer n.lock.RUnlock()
  284. return n.server
  285. }
  286. // Service retrieves a currently running service registered of a specific type.
  287. func (n *Node) Service(service interface{}) error {
  288. n.lock.RLock()
  289. defer n.lock.RUnlock()
  290. // Short circuit if the node's not running
  291. if n.server == nil {
  292. return ErrNodeStopped
  293. }
  294. // Otherwise try to find the service to return
  295. element := reflect.ValueOf(service).Elem()
  296. if running, ok := n.services[element.Type()]; ok {
  297. element.Set(reflect.ValueOf(running))
  298. return nil
  299. }
  300. return ErrServiceUnknown
  301. }
  302. // DataDir retrieves the current datadir used by the protocol stack.
  303. func (n *Node) DataDir() string {
  304. return n.datadir
  305. }
  306. // IpcEndpoint retrieves the current IPC endpoint used by the protocol stack.
  307. func (n *Node) IpcEndpoint() string {
  308. return n.ipcEndpoint
  309. }
  310. // EventMux retrieves the event multiplexer used by all the network services in
  311. // the current protocol stack.
  312. func (n *Node) EventMux() *event.TypeMux {
  313. return n.eventmux
  314. }
  315. // apis returns the collection of RPC descriptors this node offers.
  316. func (n *Node) apis() []rpc.API {
  317. return []rpc.API{
  318. {
  319. Namespace: "admin",
  320. Version: "1.0",
  321. Service: NewPrivateAdminAPI(n),
  322. }, {
  323. Namespace: "admin",
  324. Version: "1.0",
  325. Service: NewPublicAdminAPI(n),
  326. Public: true,
  327. }, {
  328. Namespace: "debug",
  329. Version: "1.0",
  330. Service: debug.Handler,
  331. }, {
  332. Namespace: "debug",
  333. Version: "1.0",
  334. Service: NewPublicDebugAPI(n),
  335. Public: true,
  336. }, {
  337. Namespace: "web3",
  338. Version: "1.0",
  339. Service: NewPublicWeb3API(n),
  340. Public: true,
  341. },
  342. }
  343. }
  344. // APIs returns the collection of RPC descriptor this node offers. This method
  345. // is just a quick placeholder passthrough for the RPC update, which in the next
  346. // step will be fully integrated into the node itself.
  347. func (n *Node) APIs() []rpc.API {
  348. apis := n.apis()
  349. for _, api := range n.services {
  350. apis = append(apis, api.APIs()...)
  351. }
  352. return apis
  353. }