| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- // Copyright 2015 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // Package node represents the Ethereum protocol stack container.
- package node
- import (
- "errors"
- "os"
- "path/filepath"
- "reflect"
- "sync"
- "syscall"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/p2p"
- )
- var (
- ErrDatadirUsed = errors.New("datadir already used")
- ErrNodeStopped = errors.New("node not started")
- ErrNodeRunning = errors.New("node already running")
- ErrServiceUnknown = errors.New("service not registered")
- ErrServiceRegistered = errors.New("service already registered")
- datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true}
- )
- // Node represents a P2P node into which arbitrary services might be registered.
- type Node struct {
- datadir string // Path to the currently used data directory
- eventmux *event.TypeMux // Event multiplexer used between the services of a stack
- serverConfig *p2p.Server // Configuration of the underlying P2P networking layer
- server *p2p.Server // Currently running P2P networking layer
- serviceIndex map[string]ServiceConstructor // Set of services currently registered in the node
- serviceOrder []string // Service construction order to handle dependencies
- services map[string]Service // Currently running services
- stop chan struct{} // Channel to wait for termination notifications
- lock sync.RWMutex
- }
- // New creates a new P2P node, ready for protocol registration.
- func New(conf *Config) (*Node, error) {
- // Ensure the data directory exists, failing if it cannot be created
- if conf.DataDir != "" {
- if err := os.MkdirAll(conf.DataDir, 0700); err != nil {
- return nil, err
- }
- }
- // Assemble the networking layer and the node itself
- nodeDbPath := ""
- if conf.DataDir != "" {
- nodeDbPath = filepath.Join(conf.DataDir, datadirNodeDatabase)
- }
- return &Node{
- datadir: conf.DataDir,
- serverConfig: &p2p.Server{
- PrivateKey: conf.NodeKey(),
- Name: conf.Name,
- Discovery: !conf.NoDiscovery,
- BootstrapNodes: conf.BootstrapNodes,
- StaticNodes: conf.StaticNodes(),
- TrustedNodes: conf.TrusterNodes(),
- NodeDatabase: nodeDbPath,
- ListenAddr: conf.ListenAddr,
- NAT: conf.NAT,
- Dialer: conf.Dialer,
- NoDial: conf.NoDial,
- MaxPeers: conf.MaxPeers,
- MaxPendingPeers: conf.MaxPendingPeers,
- },
- serviceIndex: make(map[string]ServiceConstructor),
- serviceOrder: []string{},
- eventmux: new(event.TypeMux),
- }, nil
- }
- // Register injects a new service into the node's stack.
- func (n *Node) Register(id string, constructor ServiceConstructor) error {
- n.lock.Lock()
- defer n.lock.Unlock()
- // Short circuit if the node is running or if the id is taken
- if n.server != nil {
- return ErrNodeRunning
- }
- if _, ok := n.serviceIndex[id]; ok {
- return ErrServiceRegistered
- }
- // Otherwise register the service and return
- n.serviceOrder = append(n.serviceOrder, id)
- n.serviceIndex[id] = constructor
- return nil
- }
- // Unregister removes a service from a node's stack. If the node is currently
- // running, an error will be returned.
- func (n *Node) Unregister(id string) error {
- n.lock.Lock()
- defer n.lock.Unlock()
- // Short circuit if the node is running, or if the service is unknown
- if n.server != nil {
- return ErrNodeRunning
- }
- if _, ok := n.serviceIndex[id]; !ok {
- return ErrServiceUnknown
- }
- // Otherwise drop the service and return
- delete(n.serviceIndex, id)
- for i, service := range n.serviceOrder {
- if service == id {
- n.serviceOrder = append(n.serviceOrder[:i], n.serviceOrder[i+1:]...)
- break
- }
- }
- return nil
- }
- // Start create a live P2P node and starts running it.
- func (n *Node) Start() error {
- n.lock.Lock()
- defer n.lock.Unlock()
- // Short circuit if the node's already running
- if n.server != nil {
- return ErrNodeRunning
- }
- // Otherwise copy and specialize the P2P configuration
- running := new(p2p.Server)
- *running = *n.serverConfig
- services := make(map[string]Service)
- for _, id := range n.serviceOrder {
- constructor := n.serviceIndex[id]
- // Create a new context for the particular service
- ctx := &ServiceContext{
- datadir: n.datadir,
- services: make(map[string]Service),
- EventMux: n.eventmux,
- }
- for id, s := range services { // copy needed for threaded access
- ctx.services[id] = s
- }
- // Construct and save the service
- service, err := constructor(ctx)
- if err != nil {
- return err
- }
- services[id] = service
- }
- // Gather the protocols and start the freshly assembled P2P server
- for _, service := range services {
- running.Protocols = append(running.Protocols, service.Protocols()...)
- }
- if err := running.Start(); err != nil {
- if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] {
- return ErrDatadirUsed
- }
- return err
- }
- // Start each of the services
- started := []string{}
- for id, service := range services {
- // Start the next service, stopping all previous upon failure
- if err := service.Start(running); err != nil {
- for _, id := range started {
- services[id].Stop()
- }
- running.Stop()
- return err
- }
- // Mark the service started for potential cleanup
- started = append(started, id)
- }
- // Finish initializing the startup
- n.services = services
- n.server = running
- n.stop = make(chan struct{})
- return nil
- }
- // Stop terminates a running node along with all it's services. In the node was
- // not started, an error is returned.
- func (n *Node) Stop() error {
- n.lock.Lock()
- defer n.lock.Unlock()
- // Short circuit if the node's not running
- if n.server == nil {
- return ErrNodeStopped
- }
- // Otherwise terminate all the services and the P2P server too
- failure := &StopError{
- Services: make(map[string]error),
- }
- for id, service := range n.services {
- if err := service.Stop(); err != nil {
- failure.Services[id] = err
- }
- }
- n.server.Stop()
- n.services = nil
- n.server = nil
- close(n.stop)
- if len(failure.Services) > 0 {
- return failure
- }
- return nil
- }
- // Wait blocks the thread until the node is stopped. If the node is not running
- // at the time of invocation, the method immediately returns.
- func (n *Node) Wait() {
- n.lock.RLock()
- if n.server == nil {
- return
- }
- stop := n.stop
- n.lock.RUnlock()
- <-stop
- }
- // Restart terminates a running node and boots up a new one in its place. If the
- // node isn't running, an error is returned.
- func (n *Node) Restart() error {
- if err := n.Stop(); err != nil {
- return err
- }
- if err := n.Start(); err != nil {
- return err
- }
- return nil
- }
- // Server retrieves the currently running P2P network layer. This method is meant
- // only to inspect fields of the currently running server, life cycle management
- // should be left to this Node entity.
- func (n *Node) Server() *p2p.Server {
- n.lock.RLock()
- defer n.lock.RUnlock()
- return n.server
- }
- // Service retrieves a currently running service registered under a given id.
- func (n *Node) Service(id string) Service {
- n.lock.RLock()
- defer n.lock.RUnlock()
- // Short circuit if the node's not running
- if n.server == nil {
- return nil
- }
- return n.services[id]
- }
- // SingletonService retrieves a currently running service using a specific type
- // implementing the Service interface. This is a utility function for scenarios
- // where it is known that only one instance of a given service type is running,
- // allowing to access services without needing to know their specific id with
- // which they were registered. Note, this method uses reflection, so do not run
- // in a tight loop.
- func (n *Node) SingletonService(service interface{}) (string, error) {
- n.lock.RLock()
- defer n.lock.RUnlock()
- // Short circuit if the node's not running
- if n.server == nil {
- return "", ErrServiceUnknown
- }
- // Otherwise try to find the service to return
- for id, running := range n.services {
- if reflect.TypeOf(running) == reflect.ValueOf(service).Elem().Type() {
- reflect.ValueOf(service).Elem().Set(reflect.ValueOf(running))
- return id, nil
- }
- }
- return "", ErrServiceUnknown
- }
- // DataDir retrieves the current datadir used by the protocol stack.
- func (n *Node) DataDir() string {
- return n.datadir
- }
- // EventMux retrieves the event multiplexer used by all the network services in
- // the current protocol stack.
- func (n *Node) EventMux() *event.TypeMux {
- return n.eventmux
- }
|