| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- package p2p
- import (
- "bytes"
- "net"
- "sort"
- "time"
- "github.com/ethereum/go-ethereum/ethutil"
- )
- // Protocol is implemented by P2P subprotocols.
- type Protocol interface {
- // Start is called when the protocol becomes active.
- // It should read and write messages from rw.
- // Messages must be fully consumed.
- //
- // The connection is closed when Start returns. It should return
- // any protocol-level error (such as an I/O error) that is
- // encountered.
- Start(peer *Peer, rw MsgReadWriter) error
- // Offset should return the number of message codes
- // used by the protocol.
- Offset() MsgCode
- }
- type MsgReader interface {
- ReadMsg() (Msg, error)
- }
- type MsgWriter interface {
- WriteMsg(Msg) error
- }
- // MsgReadWriter is passed to protocols. Protocol implementations can
- // use it to write messages back to a connected peer.
- type MsgReadWriter interface {
- MsgReader
- MsgWriter
- }
- type MsgHandler func(code MsgCode, data *ethutil.Value) error
- // MsgLoop reads messages off the given reader and
- // calls the handler function for each decoded message until
- // it returns an error or the peer connection is closed.
- //
- // If a message is larger than the given maximum size, RunProtocol
- // returns an appropriate error.n
- func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error {
- for {
- msg, err := r.ReadMsg()
- if err != nil {
- return err
- }
- if msg.Size > maxsize {
- return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize)
- }
- value, err := msg.Data()
- if err != nil {
- return err
- }
- if err := handler(msg.Code, value); err != nil {
- return err
- }
- }
- }
- // the ÐΞVp2p base protocol
- type baseProtocol struct {
- rw MsgReadWriter
- peer *Peer
- }
- type bpMsg struct {
- code MsgCode
- data *ethutil.Value
- }
- const (
- p2pVersion = 0
- pingTimeout = 2 * time.Second
- pingGracePeriod = 2 * time.Second
- )
- const (
- // message codes
- handshakeMsg = iota
- discMsg
- pingMsg
- pongMsg
- getPeersMsg
- peersMsg
- )
- const (
- baseProtocolOffset MsgCode = 16
- baseProtocolMaxMsgSize = 500 * 1024
- )
- type DiscReason byte
- const (
- // Values are given explicitly instead of by iota because these values are
- // defined by the wire protocol spec; it is easier for humans to ensure
- // correctness when values are explicit.
- DiscRequested = 0x00
- DiscNetworkError = 0x01
- DiscProtocolError = 0x02
- DiscUselessPeer = 0x03
- DiscTooManyPeers = 0x04
- DiscAlreadyConnected = 0x05
- DiscIncompatibleVersion = 0x06
- DiscInvalidIdentity = 0x07
- DiscQuitting = 0x08
- DiscUnexpectedIdentity = 0x09
- DiscSelf = 0x0a
- DiscReadTimeout = 0x0b
- DiscSubprotocolError = 0x10
- )
- var discReasonToString = [DiscSubprotocolError + 1]string{
- DiscRequested: "Disconnect requested",
- DiscNetworkError: "Network error",
- DiscProtocolError: "Breach of protocol",
- DiscUselessPeer: "Useless peer",
- DiscTooManyPeers: "Too many peers",
- DiscAlreadyConnected: "Already connected",
- DiscIncompatibleVersion: "Incompatible P2P protocol version",
- DiscInvalidIdentity: "Invalid node identity",
- DiscQuitting: "Client quitting",
- DiscUnexpectedIdentity: "Unexpected identity",
- DiscSelf: "Connected to self",
- DiscReadTimeout: "Read timeout",
- DiscSubprotocolError: "Subprotocol error",
- }
- func (d DiscReason) String() string {
- if len(discReasonToString) < int(d) {
- return "Unknown"
- }
- return discReasonToString[d]
- }
- func (bp *baseProtocol) Ping() {
- }
- func (bp *baseProtocol) Offset() MsgCode {
- return baseProtocolOffset
- }
- func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error {
- bp.peer, bp.rw = peer, rw
- // Do the handshake.
- // TODO: disconnect is valid before handshake, too.
- rw.WriteMsg(bp.peer.server.handshakeMsg())
- msg, err := rw.ReadMsg()
- if err != nil {
- return err
- }
- if msg.Code != handshakeMsg {
- return NewPeerError(ProtocolBreach, " first message must be handshake")
- }
- data, err := msg.Data()
- if err != nil {
- return NewPeerError(InvalidMsg, "%v", err)
- }
- if err := bp.handleHandshake(data); err != nil {
- return err
- }
- msgin := make(chan bpMsg)
- done := make(chan error, 1)
- go func() {
- done <- MsgLoop(rw, baseProtocolMaxMsgSize,
- func(code MsgCode, data *ethutil.Value) error {
- msgin <- bpMsg{code, data}
- return nil
- })
- }()
- return bp.loop(msgin, done)
- }
- func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error {
- logger.Debugf("pingpong keepalive started at %v\n", time.Now())
- messenger := bp.rw.(*proto).messenger
- pingTimer := time.NewTimer(pingTimeout)
- pinged := true
- for {
- select {
- case msg := <-msgin:
- if err := bp.handle(msg.code, msg.data); err != nil {
- return err
- }
- case err := <-quit:
- return err
- case <-messenger.pulse:
- pingTimer.Reset(pingTimeout)
- pinged = false
- case <-pingTimer.C:
- if pinged {
- return NewPeerError(PingTimeout, "")
- }
- logger.Debugf("pinging at %v\n", time.Now())
- if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil {
- return NewPeerError(WriteError, "%v", err)
- }
- pinged = true
- pingTimer.Reset(pingTimeout)
- }
- }
- }
- func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error {
- switch code {
- case handshakeMsg:
- return NewPeerError(ProtocolBreach, " extra handshake received")
- case discMsg:
- logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint()))
- bp.peer.server.PeerDisconnect() <- DisconnectRequest{
- addr: bp.peer.Address,
- reason: DiscRequested,
- }
- case pingMsg:
- return bp.rw.WriteMsg(NewMsg(pongMsg))
- case pongMsg:
- // reply for ping
- case getPeersMsg:
- // Peer asked for list of connected peers.
- peersRLP := bp.peer.server.encodedPeerList()
- if peersRLP != nil {
- msg := Msg{
- Code: peersMsg,
- Size: uint32(len(peersRLP)),
- Payload: bytes.NewReader(peersRLP),
- }
- return bp.rw.WriteMsg(msg)
- }
- case peersMsg:
- bp.handlePeers(data)
- default:
- return NewPeerError(InvalidMsgCode, "unknown message code %v", code)
- }
- return nil
- }
- func (bp *baseProtocol) handlePeers(data *ethutil.Value) {
- it := data.NewIterator()
- for it.Next() {
- ip := net.IP(it.Value().Get(0).Bytes())
- port := it.Value().Get(1).Uint()
- address := &net.TCPAddr{IP: ip, Port: int(port)}
- go bp.peer.server.PeerConnect(address)
- }
- }
- func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error {
- var (
- remoteVersion = c.Get(0).Uint()
- id = c.Get(1).Str()
- caps = c.Get(2)
- port = c.Get(3).Uint()
- pubkey = c.Get(4).Bytes()
- )
- // Check correctness of p2p protocol version
- if remoteVersion != p2pVersion {
- return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion)
- }
- // Handle the pub key (validation, uniqueness)
- if len(pubkey) == 0 {
- return NewPeerError(PubkeyMissing, "not supplied in handshake.")
- }
- if len(pubkey) != 64 {
- return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
- }
- // self connect detection
- if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 {
- return NewPeerError(PubkeyForbidden, "not allowed to connect to bp")
- }
- // register pubkey on server. this also sets the pubkey on the peer (need lock)
- if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil {
- return NewPeerError(PubkeyForbidden, err.Error())
- }
- // check port
- if bp.peer.Inbound {
- uint16port := uint16(port)
- if bp.peer.Port > 0 && bp.peer.Port != uint16port {
- return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port)
- } else {
- bp.peer.Port = uint16port
- }
- }
- capsIt := caps.NewIterator()
- for capsIt.Next() {
- cap := capsIt.Value().Str()
- bp.peer.Caps = append(bp.peer.Caps, cap)
- }
- sort.Strings(bp.peer.Caps)
- bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps)
- bp.peer.Id = id
- return nil
- }
|