node.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package node represents the Ethereum protocol stack container.
  17. package node
  18. import (
  19. "errors"
  20. "net"
  21. "os"
  22. "path/filepath"
  23. "reflect"
  24. "sync"
  25. "syscall"
  26. "github.com/ethereum/go-ethereum/event"
  27. "github.com/ethereum/go-ethereum/internal/debug"
  28. "github.com/ethereum/go-ethereum/logger"
  29. "github.com/ethereum/go-ethereum/logger/glog"
  30. "github.com/ethereum/go-ethereum/p2p"
  31. "github.com/ethereum/go-ethereum/rpc"
  32. )
  33. var (
  34. ErrDatadirUsed = errors.New("datadir already used")
  35. ErrNodeStopped = errors.New("node not started")
  36. ErrNodeRunning = errors.New("node already running")
  37. ErrServiceUnknown = errors.New("unknown service")
  38. datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
  39. )
  40. // Node represents a P2P node into which arbitrary (uniquely typed) services might
  41. // be registered.
  42. type Node struct {
  43. datadir string // Path to the currently used data directory
  44. eventmux *event.TypeMux // Event multiplexer used between the services of a stack
  45. serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
  46. server *p2p.Server // Currently running P2P networking layer
  47. serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
  48. services map[reflect.Type]Service // Currently running services
  49. rpcAPIs []rpc.API // List of APIs currently provided by the node
  50. ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled)
  51. ipcListener net.Listener // IPC RPC listener socket to serve API requests
  52. ipcHandler *rpc.Server // IPC RPC request handler to process the API requests
  53. httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
  54. httpWhitelist []string // HTTP RPC modules to allow through this endpoint
  55. httpCors string // HTTP RPC Cross-Origin Resource Sharing header
  56. httpListener net.Listener // HTTP RPC listener socket to server API requests
  57. httpHandler *rpc.Server // HTTP RPC request handler to process the API requests
  58. stop chan struct{} // Channel to wait for termination notifications
  59. lock sync.RWMutex
  60. }
  61. // New creates a new P2P node, ready for protocol registration.
  62. func New(conf *Config) (*Node, error) {
  63. // Ensure the data directory exists, failing if it cannot be created
  64. if conf.DataDir != "" {
  65. if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
  66. return nil, err
  67. }
  68. }
  69. // Assemble the networking layer and the node itself
  70. nodeDbPath := ""
  71. if conf.DataDir != "" {
  72. nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
  73. }
  74. return &Node{
  75. datadir: conf.DataDir,
  76. serverConfig: &p2p.Server{
  77. PrivateKey: conf.NodeKey(),
  78. Name: conf.Name,
  79. Discovery: !conf.NoDiscovery,
  80. BootstrapNodes: conf.BootstrapNodes,
  81. StaticNodes: conf.StaticNodes(),
  82. TrustedNodes: conf.TrusterNodes(),
  83. NodeDatabase: nodeDbPath,
  84. ListenAddr: conf.ListenAddr,
  85. NAT: conf.NAT,
  86. Dialer: conf.Dialer,
  87. NoDial: conf.NoDial,
  88. MaxPeers: conf.MaxPeers,
  89. MaxPendingPeers: conf.MaxPendingPeers,
  90. },
  91. serviceFuncs: []ServiceConstructor{},
  92. ipcEndpoint: conf.IpcEndpoint(),
  93. httpEndpoint: conf.HttpEndpoint(),
  94. httpWhitelist: conf.HttpModules,
  95. httpCors: conf.HttpCors,
  96. eventmux: new(event.TypeMux),
  97. }, nil
  98. }
  99. // Register injects a new service into the node's stack. The service created by
  100. // the passed constructor must be unique in its type with regard to sibling ones.
  101. func (n *Node) Register(constructor ServiceConstructor) error {
  102. n.lock.Lock()
  103. defer n.lock.Unlock()
  104. if n.server != nil {
  105. return ErrNodeRunning
  106. }
  107. n.serviceFuncs = append(n.serviceFuncs, constructor)
  108. return nil
  109. }
  110. // Start create a live P2P node and starts running it.
  111. func (n *Node) Start() error {
  112. n.lock.Lock()
  113. defer n.lock.Unlock()
  114. // Short circuit if the node's already running
  115. if n.server != nil {
  116. return ErrNodeRunning
  117. }
  118. // Otherwise copy and specialize the P2P configuration
  119. running := new(p2p.Server)
  120. *running = *n.serverConfig
  121. services := make(map[reflect.Type]Service)
  122. for _, constructor := range n.serviceFuncs {
  123. // Create a new context for the particular service
  124. ctx := &ServiceContext{
  125. datadir: n.datadir,
  126. services: make(map[reflect.Type]Service),
  127. EventMux: n.eventmux,
  128. }
  129. for kind, s := range services { // copy needed for threaded access
  130. ctx.services[kind] = s
  131. }
  132. // Construct and save the service
  133. service, err := constructor(ctx)
  134. if err != nil {
  135. return err
  136. }
  137. kind := reflect.TypeOf(service)
  138. if _, exists := services[kind]; exists {
  139. return &DuplicateServiceError{Kind: kind}
  140. }
  141. services[kind] = service
  142. }
  143. // Gather the protocols and start the freshly assembled P2P server
  144. for _, service := range services {
  145. running.Protocols = append(running.Protocols, service.Protocols()...)
  146. }
  147. if err := running.Start(); err != nil {
  148. if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
  149. return ErrDatadirUsed
  150. }
  151. return err
  152. }
  153. // Start each of the services
  154. started := []reflect.Type{}
  155. for kind, service := range services {
  156. // Start the next service, stopping all previous upon failure
  157. if err := service.Start(running); err != nil {
  158. for _, kind := range started {
  159. services[kind].Stop()
  160. }
  161. running.Stop()
  162. return err
  163. }
  164. // Mark the service started for potential cleanup
  165. started = append(started, kind)
  166. }
  167. // Lastly start the configured RPC interfaces
  168. if err := n.startRPC(services); err != nil {
  169. for _, service := range services {
  170. service.Stop()
  171. }
  172. running.Stop()
  173. return err
  174. }
  175. // Finish initializing the startup
  176. n.services = services
  177. n.server = running
  178. n.stop = make(chan struct{})
  179. return nil
  180. }
  181. // startRPC is a helper method to start all the various RPC endpoint during node
  182. // startup. It's not meant to be called at any time afterwards as it makes certain
  183. // assumptions about the state of the node.
  184. func (n *Node) startRPC(services map[reflect.Type]Service) error {
  185. // Gather all the possible APIs to surface
  186. apis := n.apis()
  187. for _, service := range services {
  188. apis = append(apis, service.APIs()...)
  189. }
  190. // Start the various API endpoints, terminating all in case of errors
  191. if err := n.startIPC(apis); err != nil {
  192. return err
  193. }
  194. if err := n.startHTTP(n.httpEndpoint, apis, n.httpWhitelist, n.httpCors); err != nil {
  195. n.stopIPC()
  196. return err
  197. }
  198. // All API endpoints started successfully
  199. n.rpcAPIs = apis
  200. return nil
  201. }
  202. // startIPC initializes and starts the IPC RPC endpoint.
  203. func (n *Node) startIPC(apis []rpc.API) error {
  204. // Short circuit if the IPC endpoint isn't being exposed
  205. if n.ipcEndpoint == "" {
  206. return nil
  207. }
  208. // Register all the APIs exposed by the services
  209. handler := rpc.NewServer()
  210. for _, api := range apis {
  211. if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
  212. return err
  213. }
  214. glog.V(logger.Debug).Infof("IPC registered %T under '%s'", api.Service, api.Namespace)
  215. }
  216. // All APIs registered, start the IPC listener
  217. var (
  218. listener net.Listener
  219. err error
  220. )
  221. if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil {
  222. return err
  223. }
  224. go func() {
  225. glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint)
  226. for {
  227. conn, err := listener.Accept()
  228. if err != nil {
  229. // Terminate if the listener was closed
  230. n.lock.RLock()
  231. closed := n.ipcListener == nil
  232. n.lock.RUnlock()
  233. if closed {
  234. return
  235. }
  236. // Not closed, just some error; report and continue
  237. glog.V(logger.Error).Infof("IPC accept failed: %v", err)
  238. continue
  239. }
  240. go handler.ServeCodec(rpc.NewJSONCodec(conn))
  241. }
  242. }()
  243. // All listeners booted successfully
  244. n.ipcListener = listener
  245. n.ipcHandler = handler
  246. return nil
  247. }
  248. // stopIPC terminates the IPC RPC endpoint.
  249. func (n *Node) stopIPC() {
  250. if n.ipcListener != nil {
  251. n.ipcListener.Close()
  252. n.ipcListener = nil
  253. glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint)
  254. }
  255. if n.ipcHandler != nil {
  256. n.ipcHandler.Stop()
  257. n.ipcHandler = nil
  258. }
  259. }
  260. // startHTTP initializes and starts the HTTP RPC endpoint.
  261. func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors string) error {
  262. // Short circuit if the IPC endpoint isn't being exposed
  263. if endpoint == "" {
  264. return nil
  265. }
  266. // Generate the whitelist based on the allowed modules
  267. whitelist := make(map[string]bool)
  268. for _, module := range modules {
  269. whitelist[module] = true
  270. }
  271. // Register all the APIs exposed by the services
  272. handler := rpc.NewServer()
  273. for _, api := range apis {
  274. if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
  275. if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
  276. return err
  277. }
  278. glog.V(logger.Debug).Infof("HTTP registered %T under '%s'", api.Service, api.Namespace)
  279. }
  280. }
  281. // All APIs registered, start the HTTP listener
  282. var (
  283. listener net.Listener
  284. err error
  285. )
  286. if listener, err = net.Listen("tcp", endpoint); err != nil {
  287. return err
  288. }
  289. go rpc.NewHTTPServer(cors, handler).Serve(listener)
  290. glog.V(logger.Info).Infof("HTTP endpoint opened: http://%s", endpoint)
  291. // All listeners booted successfully
  292. n.httpEndpoint = endpoint
  293. n.httpListener = listener
  294. n.httpHandler = handler
  295. n.httpCors = cors
  296. return nil
  297. }
  298. // stopHTTP terminates the HTTP RPC endpoint.
  299. func (n *Node) stopHTTP() {
  300. if n.httpListener != nil {
  301. n.httpListener.Close()
  302. n.httpListener = nil
  303. glog.V(logger.Info).Infof("HTTP endpoint closed: http://%s", n.httpEndpoint)
  304. }
  305. if n.httpHandler != nil {
  306. n.httpHandler.Stop()
  307. n.httpHandler = nil
  308. }
  309. }
  310. // Stop terminates a running node along with all it's services. In the node was
  311. // not started, an error is returned.
  312. func (n *Node) Stop() error {
  313. n.lock.Lock()
  314. defer n.lock.Unlock()
  315. // Short circuit if the node's not running
  316. if n.server == nil {
  317. return ErrNodeStopped
  318. }
  319. // Otherwise terminate the API, all services and the P2P server too
  320. n.stopIPC()
  321. n.stopHTTP()
  322. n.rpcAPIs = nil
  323. failure := &StopError{
  324. Services: make(map[reflect.Type]error),
  325. }
  326. for kind, service := range n.services {
  327. if err := service.Stop(); err != nil {
  328. failure.Services[kind] = err
  329. }
  330. }
  331. n.server.Stop()
  332. n.services = nil
  333. n.server = nil
  334. close(n.stop)
  335. if len(failure.Services) > 0 {
  336. return failure
  337. }
  338. return nil
  339. }
  340. // Wait blocks the thread until the node is stopped. If the node is not running
  341. // at the time of invocation, the method immediately returns.
  342. func (n *Node) Wait() {
  343. n.lock.RLock()
  344. if n.server == nil {
  345. return
  346. }
  347. stop := n.stop
  348. n.lock.RUnlock()
  349. <-stop
  350. }
  351. // Restart terminates a running node and boots up a new one in its place. If the
  352. // node isn't running, an error is returned.
  353. func (n *Node) Restart() error {
  354. if err := n.Stop(); err != nil {
  355. return err
  356. }
  357. if err := n.Start(); err != nil {
  358. return err
  359. }
  360. return nil
  361. }
  362. // Server retrieves the currently running P2P network layer. This method is meant
  363. // only to inspect fields of the currently running server, life cycle management
  364. // should be left to this Node entity.
  365. func (n *Node) Server() *p2p.Server {
  366. n.lock.RLock()
  367. defer n.lock.RUnlock()
  368. return n.server
  369. }
  370. // Service retrieves a currently running service registered of a specific type.
  371. func (n *Node) Service(service interface{}) error {
  372. n.lock.RLock()
  373. defer n.lock.RUnlock()
  374. // Short circuit if the node's not running
  375. if n.server == nil {
  376. return ErrNodeStopped
  377. }
  378. // Otherwise try to find the service to return
  379. element := reflect.ValueOf(service).Elem()
  380. if running, ok := n.services[element.Type()]; ok {
  381. element.Set(reflect.ValueOf(running))
  382. return nil
  383. }
  384. return ErrServiceUnknown
  385. }
  386. // DataDir retrieves the current datadir used by the protocol stack.
  387. func (n *Node) DataDir() string {
  388. return n.datadir
  389. }
  390. // IpcEndpoint retrieves the current IPC endpoint used by the protocol stack.
  391. func (n *Node) IpcEndpoint() string {
  392. return n.ipcEndpoint
  393. }
  394. // EventMux retrieves the event multiplexer used by all the network services in
  395. // the current protocol stack.
  396. func (n *Node) EventMux() *event.TypeMux {
  397. return n.eventmux
  398. }
  399. // apis returns the collection of RPC descriptors this node offers.
  400. func (n *Node) apis() []rpc.API {
  401. return []rpc.API{
  402. {
  403. Namespace: "admin",
  404. Version: "1.0",
  405. Service: NewPrivateAdminAPI(n),
  406. }, {
  407. Namespace: "admin",
  408. Version: "1.0",
  409. Service: NewPublicAdminAPI(n),
  410. Public: true,
  411. }, {
  412. Namespace: "debug",
  413. Version: "1.0",
  414. Service: debug.Handler,
  415. }, {
  416. Namespace: "debug",
  417. Version: "1.0",
  418. Service: NewPublicDebugAPI(n),
  419. Public: true,
  420. }, {
  421. Namespace: "web3",
  422. Version: "1.0",
  423. Service: NewPublicWeb3API(n),
  424. Public: true,
  425. },
  426. }
  427. }
  428. // APIs returns the collection of RPC descriptor this node offers. This method
  429. // is just a quick placeholder passthrough for the RPC update, which in the next
  430. // step will be fully integrated into the node itself.
  431. func (n *Node) APIs() []rpc.API {
  432. apis := n.apis()
  433. for _, api := range n.services {
  434. apis = append(apis, api.APIs()...)
  435. }
  436. return apis
  437. }