| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- package p2p
- import (
- "bufio"
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "sort"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/logger"
- )
- // peerAddr is the structure of a peer list element.
- // It is also a valid net.Addr.
- type peerAddr struct {
- IP net.IP
- Port uint64
- Pubkey []byte // optional
- }
- func newPeerAddr(addr net.Addr, pubkey []byte) *peerAddr {
- n := addr.Network()
- if n != "tcp" && n != "tcp4" && n != "tcp6" {
- // for testing with non-TCP
- return &peerAddr{net.ParseIP("127.0.0.1"), 30303, pubkey}
- }
- ta := addr.(*net.TCPAddr)
- return &peerAddr{ta.IP, uint64(ta.Port), pubkey}
- }
- func (d peerAddr) Network() string {
- if d.IP.To4() != nil {
- return "tcp4"
- } else {
- return "tcp6"
- }
- }
- func (d peerAddr) String() string {
- return fmt.Sprintf("%v:%d", d.IP, d.Port)
- }
- func (d peerAddr) RlpData() interface{} {
- return []interface{}{d.IP, d.Port, d.Pubkey}
- }
- // Peer represents a remote peer.
- type Peer struct {
- // Peers have all the log methods.
- // Use them to display messages related to the peer.
- *logger.Logger
- infolock sync.Mutex
- identity ClientIdentity
- caps []Cap
- listenAddr *peerAddr // what remote peer is listening on
- dialAddr *peerAddr // non-nil if dialing
- // The mutex protects the connection
- // so only one protocol can write at a time.
- writeMu sync.Mutex
- conn net.Conn
- bufconn *bufio.ReadWriter
- // These fields maintain the running protocols.
- protocols []Protocol
- runBaseProtocol bool // for testing
- runlock sync.RWMutex // protects running
- running map[string]*proto
- protoWG sync.WaitGroup
- protoErr chan error
- closed chan struct{}
- disc chan DiscReason
- activity event.TypeMux // for activity events
- slot int // index into Server peer list
- // These fields are kept so base protocol can access them.
- // TODO: this should be one or more interfaces
- ourID ClientIdentity // client id of the Server
- ourListenAddr *peerAddr // listen addr of Server, nil if not listening
- newPeerAddr chan<- *peerAddr // tell server about received peers
- otherPeers func() []*Peer // should return the list of all peers
- pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey
- }
- // NewPeer returns a peer for testing purposes.
- func NewPeer(id ClientIdentity, caps []Cap) *Peer {
- conn, _ := net.Pipe()
- peer := newPeer(conn, nil, nil)
- peer.setHandshakeInfo(id, nil, caps)
- return peer
- }
- func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer {
- p := newPeer(conn, server.Protocols, dialAddr)
- p.ourID = server.Identity
- p.newPeerAddr = server.peerConnect
- p.otherPeers = server.Peers
- p.pubkeyHook = server.verifyPeer
- p.runBaseProtocol = true
- // laddr can be updated concurrently by NAT traversal.
- // newServerPeer must be called with the server lock held.
- if server.laddr != nil {
- p.ourListenAddr = newPeerAddr(server.laddr, server.Identity.Pubkey())
- }
- return p
- }
- func newPeer(conn net.Conn, protocols []Protocol, dialAddr *peerAddr) *Peer {
- p := &Peer{
- Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()),
- conn: conn,
- dialAddr: dialAddr,
- bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
- protocols: protocols,
- running: make(map[string]*proto),
- disc: make(chan DiscReason),
- protoErr: make(chan error),
- closed: make(chan struct{}),
- }
- return p
- }
- // Identity returns the client identity of the remote peer. The
- // identity can be nil if the peer has not yet completed the
- // handshake.
- func (p *Peer) Identity() ClientIdentity {
- p.infolock.Lock()
- defer p.infolock.Unlock()
- return p.identity
- }
- // Caps returns the capabilities (supported subprotocols) of the remote peer.
- func (p *Peer) Caps() []Cap {
- p.infolock.Lock()
- defer p.infolock.Unlock()
- return p.caps
- }
- func (p *Peer) setHandshakeInfo(id ClientIdentity, laddr *peerAddr, caps []Cap) {
- p.infolock.Lock()
- p.identity = id
- p.listenAddr = laddr
- p.caps = caps
- p.infolock.Unlock()
- }
- // RemoteAddr returns the remote address of the network connection.
- func (p *Peer) RemoteAddr() net.Addr {
- return p.conn.RemoteAddr()
- }
- // LocalAddr returns the local address of the network connection.
- func (p *Peer) LocalAddr() net.Addr {
- return p.conn.LocalAddr()
- }
- // Disconnect terminates the peer connection with the given reason.
- // It returns immediately and does not wait until the connection is closed.
- func (p *Peer) Disconnect(reason DiscReason) {
- select {
- case p.disc <- reason:
- case <-p.closed:
- }
- }
- // String implements fmt.Stringer.
- func (p *Peer) String() string {
- kind := "inbound"
- p.infolock.Lock()
- if p.dialAddr != nil {
- kind = "outbound"
- }
- p.infolock.Unlock()
- return fmt.Sprintf("Peer(%p %v %s)", p, p.conn.RemoteAddr(), kind)
- }
- const (
- // maximum amount of time allowed for reading a message
- msgReadTimeout = 5 * time.Second
- // maximum amount of time allowed for writing a message
- msgWriteTimeout = 5 * time.Second
- // messages smaller than this many bytes will be read at
- // once before passing them to a protocol.
- wholePayloadSize = 64 * 1024
- )
- var (
- inactivityTimeout = 2 * time.Second
- disconnectGracePeriod = 2 * time.Second
- )
- func (p *Peer) loop() (reason DiscReason, err error) {
- defer p.activity.Stop()
- defer p.closeProtocols()
- defer close(p.closed)
- defer p.conn.Close()
- // read loop
- readMsg := make(chan Msg)
- readErr := make(chan error)
- readNext := make(chan bool, 1)
- protoDone := make(chan struct{}, 1)
- go p.readLoop(readMsg, readErr, readNext)
- readNext <- true
- if p.runBaseProtocol {
- p.startBaseProtocol()
- }
- loop:
- for {
- select {
- case msg := <-readMsg:
- // a new message has arrived.
- var wait bool
- if wait, err = p.dispatch(msg, protoDone); err != nil {
- p.Errorf("msg dispatch error: %v\n", err)
- reason = discReasonForError(err)
- break loop
- }
- if !wait {
- // Msg has already been read completely, continue with next message.
- readNext <- true
- }
- p.activity.Post(time.Now())
- case <-protoDone:
- // protocol has consumed the message payload,
- // we can continue reading from the socket.
- readNext <- true
- case err := <-readErr:
- // read failed. there is no need to run the
- // polite disconnect sequence because the connection
- // is probably dead anyway.
- // TODO: handle write errors as well
- return DiscNetworkError, err
- case err = <-p.protoErr:
- reason = discReasonForError(err)
- break loop
- case reason = <-p.disc:
- break loop
- }
- }
- // wait for read loop to return.
- close(readNext)
- <-readErr
- // tell the remote end to disconnect
- done := make(chan struct{})
- go func() {
- p.conn.SetDeadline(time.Now().Add(disconnectGracePeriod))
- p.writeMsg(NewMsg(discMsg, reason), disconnectGracePeriod)
- io.Copy(ioutil.Discard, p.conn)
- close(done)
- }()
- select {
- case <-done:
- case <-time.After(disconnectGracePeriod):
- }
- return reason, err
- }
- func (p *Peer) readLoop(msgc chan<- Msg, errc chan<- error, unblock <-chan bool) {
- for _ = range unblock {
- p.conn.SetReadDeadline(time.Now().Add(msgReadTimeout))
- if msg, err := readMsg(p.bufconn); err != nil {
- errc <- err
- } else {
- msgc <- msg
- }
- }
- close(errc)
- }
- func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) {
- proto, err := p.getProto(msg.Code)
- if err != nil {
- return false, err
- }
- if msg.Size <= wholePayloadSize {
- // optimization: msg is small enough, read all
- // of it and move on to the next message
- buf, err := ioutil.ReadAll(msg.Payload)
- if err != nil {
- return false, err
- }
- msg.Payload = bytes.NewReader(buf)
- proto.in <- msg
- } else {
- wait = true
- pr := &eofSignal{msg.Payload, protoDone}
- msg.Payload = pr
- proto.in <- msg
- }
- return wait, nil
- }
- func (p *Peer) startBaseProtocol() {
- p.runlock.Lock()
- defer p.runlock.Unlock()
- p.running[""] = p.startProto(0, Protocol{
- Length: baseProtocolLength,
- Run: runBaseProtocol,
- })
- }
- // startProtocols starts matching named subprotocols.
- func (p *Peer) startSubprotocols(caps []Cap) {
- sort.Sort(capsByName(caps))
- p.runlock.Lock()
- defer p.runlock.Unlock()
- offset := baseProtocolLength
- outer:
- for _, cap := range caps {
- for _, proto := range p.protocols {
- if proto.Name == cap.Name &&
- proto.Version == cap.Version &&
- p.running[cap.Name] == nil {
- p.running[cap.Name] = p.startProto(offset, proto)
- offset += proto.Length
- continue outer
- }
- }
- }
- }
- func (p *Peer) startProto(offset uint64, impl Protocol) *proto {
- rw := &proto{
- in: make(chan Msg),
- offset: offset,
- maxcode: impl.Length,
- peer: p,
- }
- p.protoWG.Add(1)
- go func() {
- err := impl.Run(p, rw)
- if err == nil {
- p.Infof("protocol %q returned", impl.Name)
- err = newPeerError(errMisc, "protocol returned")
- } else {
- p.Errorf("protocol %q error: %v\n", impl.Name, err)
- }
- select {
- case p.protoErr <- err:
- case <-p.closed:
- }
- p.protoWG.Done()
- }()
- return rw
- }
- // getProto finds the protocol responsible for handling
- // the given message code.
- func (p *Peer) getProto(code uint64) (*proto, error) {
- p.runlock.RLock()
- defer p.runlock.RUnlock()
- for _, proto := range p.running {
- if code >= proto.offset && code < proto.offset+proto.maxcode {
- return proto, nil
- }
- }
- return nil, newPeerError(errInvalidMsgCode, "%d", code)
- }
- func (p *Peer) closeProtocols() {
- p.runlock.RLock()
- for _, p := range p.running {
- close(p.in)
- }
- p.runlock.RUnlock()
- p.protoWG.Wait()
- }
- // writeProtoMsg sends the given message on behalf of the given named protocol.
- func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
- p.runlock.RLock()
- proto, ok := p.running[protoName]
- p.runlock.RUnlock()
- if !ok {
- return fmt.Errorf("protocol %s not handled by peer", protoName)
- }
- if msg.Code >= proto.maxcode {
- return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName)
- }
- msg.Code += proto.offset
- return p.writeMsg(msg, msgWriteTimeout)
- }
- // writeMsg writes a message to the connection.
- func (p *Peer) writeMsg(msg Msg, timeout time.Duration) error {
- p.writeMu.Lock()
- defer p.writeMu.Unlock()
- p.conn.SetWriteDeadline(time.Now().Add(timeout))
- if err := writeMsg(p.bufconn, msg); err != nil {
- return newPeerError(errWrite, "%v", err)
- }
- return p.bufconn.Flush()
- }
- type proto struct {
- name string
- in chan Msg
- maxcode, offset uint64
- peer *Peer
- }
- func (rw *proto) WriteMsg(msg Msg) error {
- if msg.Code >= rw.maxcode {
- return newPeerError(errInvalidMsgCode, "not handled")
- }
- msg.Code += rw.offset
- return rw.peer.writeMsg(msg, msgWriteTimeout)
- }
- func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error {
- return rw.WriteMsg(NewMsg(code, data))
- }
- func (rw *proto) ReadMsg() (Msg, error) {
- msg, ok := <-rw.in
- if !ok {
- return msg, io.EOF
- }
- msg.Code -= rw.offset
- return msg, nil
- }
- // eofSignal wraps a reader with eof signaling.
- // the eof channel is closed when the wrapped reader
- // reaches EOF.
- type eofSignal struct {
- wrapped io.Reader
- eof chan<- struct{}
- }
- func (r *eofSignal) Read(buf []byte) (int, error) {
- n, err := r.wrapped.Read(buf)
- if err != nil {
- r.eof <- struct{}{} // tell Peer that msg has been consumed
- }
- return n, err
- }
|