| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- // Copyright 2015 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // Package node represents the Ethereum protocol stack container.
- package node
- import (
- "errors"
- "os"
- "path/filepath"
- "reflect"
- "sync"
- "syscall"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/internal/debug"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/rpc"
- )
- var (
- ErrDatadirUsed = errors.New("datadir already used")
- ErrNodeStopped = errors.New("node not started")
- ErrNodeRunning = errors.New("node already running")
- ErrServiceUnknown = errors.New("unknown service")
- datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
- )
- // Node represents a P2P node into which arbitrary (uniquely typed) services might
- // be registered.
- type Node struct {
- datadir string // Path to the currently used data directory
- eventmux *event.TypeMux // Event multiplexer used between the services of a stack
- serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
- server *p2p.Server // Currently running P2P networking layer
- serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
- services map[reflect.Type]Service // Currently running services
- stop chan struct{} // Channel to wait for termination notifications
- lock sync.RWMutex
- }
- // New creates a new P2P node, ready for protocol registration.
- func New(conf *Config) (*Node, error) {
- // Ensure the data directory exists, failing if it cannot be created
- if conf.DataDir != "" {
- if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
- return nil, err
- }
- }
- // Assemble the networking layer and the node itself
- nodeDbPath := ""
- if conf.DataDir != "" {
- nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
- }
- return &Node{
- datadir: conf.DataDir,
- serverConfig: &p2p.Server{
- PrivateKey: conf.NodeKey(),
- Name: conf.Name,
- Discovery: !conf.NoDiscovery,
- BootstrapNodes: conf.BootstrapNodes,
- StaticNodes: conf.StaticNodes(),
- TrustedNodes: conf.TrusterNodes(),
- NodeDatabase: nodeDbPath,
- ListenAddr: conf.ListenAddr,
- NAT: conf.NAT,
- Dialer: conf.Dialer,
- NoDial: conf.NoDial,
- MaxPeers: conf.MaxPeers,
- MaxPendingPeers: conf.MaxPendingPeers,
- },
- serviceFuncs: []ServiceConstructor{},
- eventmux: new(event.TypeMux),
- }, nil
- }
- // Register injects a new service into the node's stack. The service created by
- // the passed constructor must be unique in its type with regard to sibling ones.
- func (n *Node) Register(constructor ServiceConstructor) error {
- n.lock.Lock()
- defer n.lock.Unlock()
- if n.server != nil {
- return ErrNodeRunning
- }
- n.serviceFuncs = append(n.serviceFuncs, constructor)
- return nil
- }
- // Start create a live P2P node and starts running it.
- func (n *Node) Start() error {
- n.lock.Lock()
- defer n.lock.Unlock()
- // Short circuit if the node's already running
- if n.server != nil {
- return ErrNodeRunning
- }
- // Otherwise copy and specialize the P2P configuration
- running := new(p2p.Server)
- *running = *n.serverConfig
- services := make(map[reflect.Type]Service)
- for _, constructor := range n.serviceFuncs {
- // Create a new context for the particular service
- ctx := &ServiceContext{
- datadir: n.datadir,
- services: make(map[reflect.Type]Service),
- EventMux: n.eventmux,
- }
- for kind, s := range services { // copy needed for threaded access
- ctx.services[kind] = s
- }
- // Construct and save the service
- service, err := constructor(ctx)
- if err != nil {
- return err
- }
- kind := reflect.TypeOf(service)
- if _, exists := services[kind]; exists {
- return &DuplicateServiceError{Kind: kind}
- }
- services[kind] = service
- }
- // Gather the protocols and start the freshly assembled P2P server
- for _, service := range services {
- running.Protocols = append(running.Protocols, service.Protocols()...)
- }
- if err := running.Start(); err != nil {
- if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
- return ErrDatadirUsed
- }
- return err
- }
- // Start each of the services
- started := []reflect.Type{}
- for kind, service := range services {
- // Start the next service, stopping all previous upon failure
- if err := service.Start(running); err != nil {
- for _, kind := range started {
- services[kind].Stop()
- }
- running.Stop()
- return err
- }
- // Mark the service started for potential cleanup
- started = append(started, kind)
- }
- // Finish initializing the startup
- n.services = services
- n.server = running
- n.stop = make(chan struct{})
- return nil
- }
- // Stop terminates a running node along with all it's services. In the node was
- // not started, an error is returned.
- func (n *Node) Stop() error {
- n.lock.Lock()
- defer n.lock.Unlock()
- // Short circuit if the node's not running
- if n.server == nil {
- return ErrNodeStopped
- }
- // Otherwise terminate all the services and the P2P server too
- failure := &StopError{
- Services: make(map[reflect.Type]error),
- }
- for kind, service := range n.services {
- if err := service.Stop(); err != nil {
- failure.Services[kind] = err
- }
- }
- n.server.Stop()
- n.services = nil
- n.server = nil
- close(n.stop)
- if len(failure.Services) > 0 {
- return failure
- }
- return nil
- }
- // Wait blocks the thread until the node is stopped. If the node is not running
- // at the time of invocation, the method immediately returns.
- func (n *Node) Wait() {
- n.lock.RLock()
- if n.server == nil {
- return
- }
- stop := n.stop
- n.lock.RUnlock()
- <-stop
- }
- // Restart terminates a running node and boots up a new one in its place. If the
- // node isn't running, an error is returned.
- func (n *Node) Restart() error {
- if err := n.Stop(); err != nil {
- return err
- }
- if err := n.Start(); err != nil {
- return err
- }
- return nil
- }
- // Server retrieves the currently running P2P network layer. This method is meant
- // only to inspect fields of the currently running server, life cycle management
- // should be left to this Node entity.
- func (n *Node) Server() *p2p.Server {
- n.lock.RLock()
- defer n.lock.RUnlock()
- return n.server
- }
- // Service retrieves a currently running service registered of a specific type.
- func (n *Node) Service(service interface{}) error {
- n.lock.RLock()
- defer n.lock.RUnlock()
- // Short circuit if the node's not running
- if n.server == nil {
- return ErrNodeStopped
- }
- // Otherwise try to find the service to return
- element := reflect.ValueOf(service).Elem()
- if running, ok := n.services[element.Type()]; ok {
- element.Set(reflect.ValueOf(running))
- return nil
- }
- return ErrServiceUnknown
- }
- // DataDir retrieves the current datadir used by the protocol stack.
- func (n *Node) DataDir() string {
- return n.datadir
- }
- // EventMux retrieves the event multiplexer used by all the network services in
- // the current protocol stack.
- func (n *Node) EventMux() *event.TypeMux {
- return n.eventmux
- }
- // APIs returns the collection of RPC descriptor this node offers. This method
- // is just a quick placeholder passthrough for the RPC update, which in the next
- // step will be fully integrated into the node itself.
- func (n *Node) APIs() []rpc.API {
- // Define all the APIs owned by the node itself
- apis := []rpc.API{
- {
- Namespace: "admin",
- Version: "1.0",
- Service: NewPrivateAdminAPI(n),
- }, {
- Namespace: "admin",
- Version: "1.0",
- Service: NewPublicAdminAPI(n),
- Public: true,
- }, {
- Namespace: "debug",
- Version: "1.0",
- Service: debug.Handler,
- }, {
- Namespace: "debug",
- Version: "1.0",
- Service: NewPublicDebugAPI(n),
- Public: true,
- }, {
- Namespace: "web3",
- Version: "1.0",
- Service: NewPublicWeb3API(n),
- Public: true,
- },
- }
- // Inject all the APIs owned by various services
- for _, api := range n.services {
- apis = append(apis, api.APIs()...)
- }
- return apis
- }
|