node.go 17 KB


  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package node
  17. import (
  18. "errors"
  19. "fmt"
  20. "net/http"
  21. "os"
  22. "path/filepath"
  23. "reflect"
  24. "strings"
  25. "sync"
  26. "github.com/ethereum/go-ethereum/accounts"
  27. "github.com/ethereum/go-ethereum/core/rawdb"
  28. "github.com/ethereum/go-ethereum/ethdb"
  29. "github.com/ethereum/go-ethereum/event"
  30. "github.com/ethereum/go-ethereum/log"
  31. "github.com/ethereum/go-ethereum/p2p"
  32. "github.com/ethereum/go-ethereum/rpc"
  33. "github.com/prometheus/tsdb/fileutil"
  34. )
  35. // Node is a container on which services can be registered.
  36. type Node struct {
  37. eventmux *event.TypeMux
  38. config *Config
  39. accman *accounts.Manager
  40. log log.Logger
  41. ephemKeystore string // if non-empty, the key directory that will be removed by Stop
  42. dirLock fileutil.Releaser // prevents concurrent use of instance directory
  43. stop chan struct{} // Channel to wait for termination notifications
  44. server *p2p.Server // Currently running P2P networking layer
  45. startStopLock sync.Mutex // Start/Stop are protected by an additional lock
  46. state int // Tracks state of node lifecycle
  47. lock sync.Mutex
  48. lifecycles []Lifecycle // All registered backends, services, and auxiliary services that have a lifecycle
  49. rpcAPIs []rpc.API // List of APIs currently provided by the node
  50. http *httpServer //
  51. ws *httpServer //
  52. ipc *ipcServer // Stores information about the ipc http server
  53. inprocHandler *rpc.Server // In-process RPC request handler to process the API requests
  54. databases map[*closeTrackingDB]struct{} // All open databases
  55. }
  56. const (
  57. initializingState = iota
  58. runningState
  59. closedState
  60. )
  61. // New creates a new P2P node, ready for protocol registration.
  62. func New(conf *Config) (*Node, error) {
  63. // Copy config and resolve the datadir so future changes to the current
  64. // working directory don't affect the node.
  65. confCopy := *conf
  66. conf = &confCopy
  67. if conf.DataDir != "" {
  68. absdatadir, err := filepath.Abs(conf.DataDir)
  69. if err != nil {
  70. return nil, err
  71. }
  72. conf.DataDir = absdatadir
  73. }
  74. if conf.Logger == nil {
  75. conf.Logger = log.New()
  76. }
  77. // Ensure that the instance name doesn't cause weird conflicts with
  78. // other files in the data directory.
  79. if strings.ContainsAny(conf.Name, `/\`) {
  80. return nil, errors.New(`Config.Name must not contain '/' or '\'`)
  81. }
  82. if conf.Name == datadirDefaultKeyStore {
  83. return nil, errors.New(`Config.Name cannot be "` + datadirDefaultKeyStore + `"`)
  84. }
  85. if strings.HasSuffix(conf.Name, ".ipc") {
  86. return nil, errors.New(`Config.Name cannot end in ".ipc"`)
  87. }
  88. node := &Node{
  89. config: conf,
  90. inprocHandler: rpc.NewServer(),
  91. eventmux: new(event.TypeMux),
  92. log: conf.Logger,
  93. stop: make(chan struct{}),
  94. server: &p2p.Server{Config: conf.P2P},
  95. databases: make(map[*closeTrackingDB]struct{}),
  96. }
  97. // Register built-in APIs.
  98. node.rpcAPIs = append(node.rpcAPIs, node.apis()...)
  99. // Acquire the instance directory lock.
  100. if err := node.openDataDir(); err != nil {
  101. return nil, err
  102. }
  103. // Ensure that the AccountManager method works before the node has started. We rely on
  104. // this in cmd/geth.
  105. am, ephemeralKeystore, err := makeAccountManager(conf)
  106. if err != nil {
  107. return nil, err
  108. }
  109. node.accman = am
  110. node.ephemKeystore = ephemeralKeystore
  111. // Initialize the p2p server. This creates the node key and discovery databases.
  112. node.server.Config.PrivateKey = node.config.NodeKey()
  113. node.server.Config.Name = node.config.NodeName()
  114. node.server.Config.Logger = node.log
  115. if node.server.Config.StaticNodes == nil {
  116. node.server.Config.StaticNodes = node.config.StaticNodes()
  117. }
  118. if node.server.Config.TrustedNodes == nil {
  119. node.server.Config.TrustedNodes = node.config.TrustedNodes()
  120. }
  121. if node.server.Config.NodeDatabase == "" {
  122. node.server.Config.NodeDatabase = node.config.NodeDB()
  123. }
  124. // Configure RPC servers.
  125. node.http = newHTTPServer(node.log, conf.HTTPTimeouts)
  126. node.ws = newHTTPServer(node.log, rpc.DefaultHTTPTimeouts)
  127. node.ipc = newIPCServer(node.log, conf.IPCEndpoint())
  128. return node, nil
  129. }
  130. // Start starts all registered lifecycles, RPC services and p2p networking.
  131. // Node can only be started once.
  132. func (n *Node) Start() error {
  133. n.startStopLock.Lock()
  134. defer n.startStopLock.Unlock()
  135. n.lock.Lock()
  136. switch n.state {
  137. case runningState:
  138. n.lock.Unlock()
  139. return ErrNodeRunning
  140. case closedState:
  141. n.lock.Unlock()
  142. return ErrNodeStopped
  143. }
  144. n.state = runningState
  145. err := n.startNetworking()
  146. lifecycles := make([]Lifecycle, len(n.lifecycles))
  147. copy(lifecycles, n.lifecycles)
  148. n.lock.Unlock()
  149. // Check if networking startup failed.
  150. if err != nil {
  151. n.doClose(nil)
  152. return err
  153. }
  154. // Start all registered lifecycles.
  155. var started []Lifecycle
  156. for _, lifecycle := range lifecycles {
  157. if err = lifecycle.Start(); err != nil {
  158. break
  159. }
  160. started = append(started, lifecycle)
  161. }
  162. // Check if any lifecycle failed to start.
  163. if err != nil {
  164. n.stopServices(started)
  165. n.doClose(nil)
  166. }
  167. return err
  168. }
  169. // Close stops the Node and releases resources acquired in
  170. // Node constructor New.
  171. func (n *Node) Close() error {
  172. n.startStopLock.Lock()
  173. defer n.startStopLock.Unlock()
  174. n.lock.Lock()
  175. state := n.state
  176. n.lock.Unlock()
  177. switch state {
  178. case initializingState:
  179. // The node was never started.
  180. return n.doClose(nil)
  181. case runningState:
  182. // The node was started, release resources acquired by Start().
  183. var errs []error
  184. if err := n.stopServices(n.lifecycles); err != nil {
  185. errs = append(errs, err)
  186. }
  187. return n.doClose(errs)
  188. case closedState:
  189. return ErrNodeStopped
  190. default:
  191. panic(fmt.Sprintf("node is in unknown state %d", state))
  192. }
  193. }
  194. // doClose releases resources acquired by New(), collecting errors.
  195. func (n *Node) doClose(errs []error) error {
  196. // Close databases. This needs the lock because it needs to
  197. // synchronize with OpenDatabase*.
  198. n.lock.Lock()
  199. n.state = closedState
  200. errs = append(errs, n.closeDatabases()...)
  201. n.lock.Unlock()
  202. if err := n.accman.Close(); err != nil {
  203. errs = append(errs, err)
  204. }
  205. if n.ephemKeystore != "" {
  206. if err := os.RemoveAll(n.ephemKeystore); err != nil {
  207. errs = append(errs, err)
  208. }
  209. }
  210. // Release instance directory lock.
  211. n.closeDataDir()
  212. // Unblock n.Wait.
  213. close(n.stop)
  214. // Report any errors that might have occurred.
  215. switch len(errs) {
  216. case 0:
  217. return nil
  218. case 1:
  219. return errs[0]
  220. default:
  221. return fmt.Errorf("%v", errs)
  222. }
  223. }
  224. // startNetworking starts all network endpoints.
  225. func (n *Node) startNetworking() error {
  226. n.log.Info("Starting peer-to-peer node", "instance", n.server.Name)
  227. if err := n.server.Start(); err != nil {
  228. return convertFileLockError(err)
  229. }
  230. err := n.startRPC()
  231. if err != nil {
  232. n.stopRPC()
  233. n.server.Stop()
  234. }
  235. return err
  236. }
  237. // containsLifecycle checks if 'lfs' contains 'l'.
  238. func containsLifecycle(lfs []Lifecycle, l Lifecycle) bool {
  239. for _, obj := range lfs {
  240. if obj == l {
  241. return true
  242. }
  243. }
  244. return false
  245. }
  246. // stopServices terminates running services, RPC and p2p networking.
  247. // It is the inverse of Start.
  248. func (n *Node) stopServices(running []Lifecycle) error {
  249. n.stopRPC()
  250. // Stop running lifecycles in reverse order.
  251. failure := &StopError{Services: make(map[reflect.Type]error)}
  252. for i := len(running) - 1; i >= 0; i-- {
  253. if err := running[i].Stop(); err != nil {
  254. failure.Services[reflect.TypeOf(running[i])] = err
  255. }
  256. }
  257. // Stop p2p networking.
  258. n.server.Stop()
  259. if len(failure.Services) > 0 {
  260. return failure
  261. }
  262. return nil
  263. }
  264. func (n *Node) openDataDir() error {
  265. if n.config.DataDir == "" {
  266. return nil // ephemeral
  267. }
  268. instdir := filepath.Join(n.config.DataDir, n.config.name())
  269. if err := os.MkdirAll(instdir, 0700); err != nil {
  270. return err
  271. }
  272. // Lock the instance directory to prevent concurrent use by another instance as well as
  273. // accidental use of the instance directory as a database.
  274. release, _, err := fileutil.Flock(filepath.Join(instdir, "LOCK"))
  275. if err != nil {
  276. return convertFileLockError(err)
  277. }
  278. n.dirLock = release
  279. return nil
  280. }
  281. func (n *Node) closeDataDir() {
  282. // Release instance directory lock.
  283. if n.dirLock != nil {
  284. if err := n.dirLock.Release(); err != nil {
  285. n.log.Error("Can't release datadir lock", "err", err)
  286. }
  287. n.dirLock = nil
  288. }
  289. }
  290. // configureRPC is a helper method to configure all the various RPC endpoints during node
  291. // startup. It's not meant to be called at any time afterwards as it makes certain
  292. // assumptions about the state of the node.
  293. func (n *Node) startRPC() error {
  294. if err := n.startInProc(); err != nil {
  295. return err
  296. }
  297. // Configure IPC.
  298. if n.ipc.endpoint != "" {
  299. if err := n.ipc.start(n.rpcAPIs); err != nil {
  300. return err
  301. }
  302. }
  303. // Configure HTTP.
  304. if n.config.HTTPHost != "" {
  305. config := httpConfig{
  306. CorsAllowedOrigins: n.config.HTTPCors,
  307. Vhosts: n.config.HTTPVirtualHosts,
  308. Modules: n.config.HTTPModules,
  309. }
  310. if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil {
  311. return err
  312. }
  313. if err := n.http.enableRPC(n.rpcAPIs, config); err != nil {
  314. return err
  315. }
  316. }
  317. // Configure WebSocket.
  318. if n.config.WSHost != "" {
  319. server := n.wsServerForPort(n.config.WSPort)
  320. config := wsConfig{
  321. Modules: n.config.WSModules,
  322. Origins: n.config.WSOrigins,
  323. }
  324. if err := server.setListenAddr(n.config.WSHost, n.config.WSPort); err != nil {
  325. return err
  326. }
  327. if err := server.enableWS(n.rpcAPIs, config); err != nil {
  328. return err
  329. }
  330. }
  331. if err := n.http.start(); err != nil {
  332. return err
  333. }
  334. return n.ws.start()
  335. }
  336. func (n *Node) wsServerForPort(port int) *httpServer {
  337. if n.config.HTTPHost == "" || n.http.port == port {
  338. return n.http
  339. }
  340. return n.ws
  341. }
  342. func (n *Node) stopRPC() {
  343. n.http.stop()
  344. n.ws.stop()
  345. n.ipc.stop()
  346. n.stopInProc()
  347. }
  348. // startInProc registers all RPC APIs on the inproc server.
  349. func (n *Node) startInProc() error {
  350. for _, api := range n.rpcAPIs {
  351. if err := n.inprocHandler.RegisterName(api.Namespace, api.Service); err != nil {
  352. return err
  353. }
  354. }
  355. return nil
  356. }
  357. // stopInProc terminates the in-process RPC endpoint.
  358. func (n *Node) stopInProc() {
  359. n.inprocHandler.Stop()
  360. }
  361. // Wait blocks until the node is closed.
  362. func (n *Node) Wait() {
  363. <-n.stop
  364. }
  365. // RegisterLifecycle registers the given Lifecycle on the node.
  366. func (n *Node) RegisterLifecycle(lifecycle Lifecycle) {
  367. n.lock.Lock()
  368. defer n.lock.Unlock()
  369. if n.state != initializingState {
  370. panic("can't register lifecycle on running/stopped node")
  371. }
  372. if containsLifecycle(n.lifecycles, lifecycle) {
  373. panic(fmt.Sprintf("attempt to register lifecycle %T more than once", lifecycle))
  374. }
  375. n.lifecycles = append(n.lifecycles, lifecycle)
  376. }
  377. // RegisterProtocols adds backend's protocols to the node's p2p server.
  378. func (n *Node) RegisterProtocols(protocols []p2p.Protocol) {
  379. n.lock.Lock()
  380. defer n.lock.Unlock()
  381. if n.state != initializingState {
  382. panic("can't register protocols on running/stopped node")
  383. }
  384. n.server.Protocols = append(n.server.Protocols, protocols...)
  385. }
  386. // RegisterAPIs registers the APIs a service provides on the node.
  387. func (n *Node) RegisterAPIs(apis []rpc.API) {
  388. n.lock.Lock()
  389. defer n.lock.Unlock()
  390. if n.state != initializingState {
  391. panic("can't register APIs on running/stopped node")
  392. }
  393. n.rpcAPIs = append(n.rpcAPIs, apis...)
  394. }
  395. // RegisterHandler mounts a handler on the given path on the canonical HTTP server.
  396. //
  397. // The name of the handler is shown in a log message when the HTTP server starts
  398. // and should be a descriptive term for the service provided by the handler.
  399. func (n *Node) RegisterHandler(name, path string, handler http.Handler) {
  400. n.lock.Lock()
  401. defer n.lock.Unlock()
  402. if n.state != initializingState {
  403. panic("can't register HTTP handler on running/stopped node")
  404. }
  405. n.http.mux.Handle(path, handler)
  406. n.http.handlerNames[path] = name
  407. }
  408. // Attach creates an RPC client attached to an in-process API handler.
  409. func (n *Node) Attach() (*rpc.Client, error) {
  410. return rpc.DialInProc(n.inprocHandler), nil
  411. }
  412. // RPCHandler returns the in-process RPC request handler.
  413. func (n *Node) RPCHandler() (*rpc.Server, error) {
  414. n.lock.Lock()
  415. defer n.lock.Unlock()
  416. if n.state == closedState {
  417. return nil, ErrNodeStopped
  418. }
  419. return n.inprocHandler, nil
  420. }
  421. // Config returns the configuration of node.
  422. func (n *Node) Config() *Config {
  423. return n.config
  424. }
  425. // Server retrieves the currently running P2P network layer. This method is meant
  426. // only to inspect fields of the currently running server. Callers should not
  427. // start or stop the returned server.
  428. func (n *Node) Server() *p2p.Server {
  429. n.lock.Lock()
  430. defer n.lock.Unlock()
  431. return n.server
  432. }
  433. // DataDir retrieves the current datadir used by the protocol stack.
  434. // Deprecated: No files should be stored in this directory, use InstanceDir instead.
  435. func (n *Node) DataDir() string {
  436. return n.config.DataDir
  437. }
  438. // InstanceDir retrieves the instance directory used by the protocol stack.
  439. func (n *Node) InstanceDir() string {
  440. return n.config.instanceDir()
  441. }
  442. // AccountManager retrieves the account manager used by the protocol stack.
  443. func (n *Node) AccountManager() *accounts.Manager {
  444. return n.accman
  445. }
  446. // IPCEndpoint retrieves the current IPC endpoint used by the protocol stack.
  447. func (n *Node) IPCEndpoint() string {
  448. return n.ipc.endpoint
  449. }
  450. // HTTPEndpoint returns the URL of the HTTP server.
  451. func (n *Node) HTTPEndpoint() string {
  452. return "http://" + n.http.listenAddr()
  453. }
  454. // WSEndpoint retrieves the current WS endpoint used by the protocol stack.
  455. func (n *Node) WSEndpoint() string {
  456. if n.http.wsAllowed() {
  457. return "ws://" + n.http.listenAddr()
  458. }
  459. return "ws://" + n.ws.listenAddr()
  460. }
  461. // EventMux retrieves the event multiplexer used by all the network services in
  462. // the current protocol stack.
  463. func (n *Node) EventMux() *event.TypeMux {
  464. return n.eventmux
  465. }
  466. // OpenDatabase opens an existing database with the given name (or creates one if no
  467. // previous can be found) from within the node's instance directory. If the node is
  468. // ephemeral, a memory database is returned.
  469. func (n *Node) OpenDatabase(name string, cache, handles int, namespace string) (ethdb.Database, error) {
  470. n.lock.Lock()
  471. defer n.lock.Unlock()
  472. if n.state == closedState {
  473. return nil, ErrNodeStopped
  474. }
  475. var db ethdb.Database
  476. var err error
  477. if n.config.DataDir == "" {
  478. db = rawdb.NewMemoryDatabase()
  479. } else {
  480. db, err = rawdb.NewLevelDBDatabase(n.ResolvePath(name), cache, handles, namespace)
  481. }
  482. if err == nil {
  483. db = n.wrapDatabase(db)
  484. }
  485. return db, err
  486. }
  487. // OpenDatabaseWithFreezer opens an existing database with the given name (or
  488. // creates one if no previous can be found) from within the node's data directory,
  489. // also attaching a chain freezer to it that moves ancient chain data from the
  490. // database to immutable append-only files. If the node is an ephemeral one, a
  491. // memory database is returned.
  492. func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, freezer, namespace string) (ethdb.Database, error) {
  493. n.lock.Lock()
  494. defer n.lock.Unlock()
  495. if n.state == closedState {
  496. return nil, ErrNodeStopped
  497. }
  498. var db ethdb.Database
  499. var err error
  500. if n.config.DataDir == "" {
  501. db = rawdb.NewMemoryDatabase()
  502. } else {
  503. root := n.ResolvePath(name)
  504. switch {
  505. case freezer == "":
  506. freezer = filepath.Join(root, "ancient")
  507. case !filepath.IsAbs(freezer):
  508. freezer = n.ResolvePath(freezer)
  509. }
  510. db, err = rawdb.NewLevelDBDatabaseWithFreezer(root, cache, handles, freezer, namespace)
  511. }
  512. if err == nil {
  513. db = n.wrapDatabase(db)
  514. }
  515. return db, err
  516. }
  517. // ResolvePath returns the absolute path of a resource in the instance directory.
  518. func (n *Node) ResolvePath(x string) string {
  519. return n.config.ResolvePath(x)
  520. }
  521. // closeTrackingDB wraps the Close method of a database. When the database is closed by the
  522. // service, the wrapper removes it from the node's database map. This ensures that Node
  523. // won't auto-close the database if it is closed by the service that opened it.
  524. type closeTrackingDB struct {
  525. ethdb.Database
  526. n *Node
  527. }
  528. func (db *closeTrackingDB) Close() error {
  529. db.n.lock.Lock()
  530. delete(db.n.databases, db)
  531. db.n.lock.Unlock()
  532. return db.Database.Close()
  533. }
  534. // wrapDatabase ensures the database will be auto-closed when Node is closed.
  535. func (n *Node) wrapDatabase(db ethdb.Database) ethdb.Database {
  536. wrapper := &closeTrackingDB{db, n}
  537. n.databases[wrapper] = struct{}{}
  538. return wrapper
  539. }
  540. // closeDatabases closes all open databases.
  541. func (n *Node) closeDatabases() (errors []error) {
  542. for db := range n.databases {
  543. delete(n.databases, db)
  544. if err := db.Database.Close(); err != nil {
  545. errors = append(errors, err)
  546. }
  547. }
  548. return errors
  549. }