package p2p import ( "blockchain-go/common/gopool" "blockchain-go/common/mclock" "blockchain-go/p2p/enode" "blockchain-go/rlp" "errors" "fmt" "net" "sort" "sync" "time" ) var ( ErrShuttingDown = errors.New("shutting down") ) const ( baseProtocolVersion = 5 baseProtocolLength = uint64(16) baseProtocolMaxMsgSize = 2 * 1024 snappyProtocolVersion = 5 pingInterval = 15 * time.Second ) const ( // devp2p message codes handshakeMsg = 0x00 discMsg = 0x01 pingMsg = 0x02 pongMsg = 0x03 ) // protoHandshake is the RLP structure of the protocol handshake. type protoHandshake struct { Version uint64 Name string Caps []Cap ListenPort uint64 ID []byte // secp256k1 public key // Ignore additional fields (for forward compatibility). //Rest []rlp.RawValue `rlp:"tail"` } type PeerEventType string const ( // PeerEventTypeAdd is the type of event emitted when a peer is added // to a p2p.Server PeerEventTypeAdd PeerEventType = "add" // PeerEventTypeDrop is the type of event emitted when a peer is // dropped from a p2p.Server PeerEventTypeDrop PeerEventType = "drop" // PeerEventTypeMsgSend is the type of event emitted when a // message is successfully sent to a peer PeerEventTypeMsgSend PeerEventType = "msgsend" // PeerEventTypeMsgRecv is the type of event emitted when a // message is received from a peer PeerEventTypeMsgRecv PeerEventType = "msgrecv" ) // PeerEvent is an event emitted when peers are either added or dropped from // a p2p.Server or when a message is sent or received on a peer connection type PeerEvent struct { Type PeerEventType `json:"type"` Peer enode.ID `json:"peer"` Error string `json:"error,omitempty"` Protocol string `json:"protocol,omitempty"` MsgCode *uint64 `json:"msg_code,omitempty"` MsgSize *uint32 `json:"msg_size,omitempty"` LocalAddress string `json:"local,omitempty"` RemoteAddress string `json:"remote,omitempty"` } // Peer represents a connected remote node. type Peer struct { rw *conn running map[string]*protoRW created mclock.AbsTime protoErr chan error closed chan struct{} disc chan DiscReason wg sync.WaitGroup //log log.Logger // events receives message send / receive events if set //events *event.Feed } func newPeer(conn *conn, protocols []Protocol) *Peer { protoMap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ rw: conn, running: protoMap, created: mclock.Now(), protoErr: make(chan error, len(protoMap)+1), closed: make(chan struct{}), } return p } func (p *Peer) Inbound() bool { return p.rw.is(inboundConn) } func (p *Peer) ID() enode.ID { return p.rw.node.ID() } func (p *Peer) RemoteAddr() net.Addr { return p.rw.fd.RemoteAddr() } func (p *Peer) LocalAddr() net.Addr { return p.rw.fd.LocalAddr() } func (p *Peer) run() (remoteRequested bool, err error) { var ( writeStart = make(chan struct{}, 1) writeErr = make(chan error, 1) readErr = make(chan error, 1) reason DiscReason ) p.wg.Add(2) go p.readLoop(readErr) go p.pingLoop() writeStart <- struct{}{} loop: for { select { case err = <-writeErr: if err != nil { reason = DiscNetworkError break loop } writeStart <- struct{}{} case err = <-readErr: if r, ok := err.(DiscReason); ok { remoteRequested = true reason = r } else { reason = DiscNetworkError } break loop case err = <-p.protoErr: reason = discReasonForError(err) break loop case err = <-p.disc: reason = discReasonForError(err) break loop } } close(p.closed) p.rw.close(reason) p.wg.Wait() return remoteRequested, err } func (p *Peer) readLoop(errc chan<- error) { defer p.wg.Done() for { msg, err := p.rw.ReadMsg() if err != nil { errc <- err return } msg.ReceivedAt = time.Now() if err = p.handle(msg); err != nil { errc <- err return } } } func (p *Peer) handle(msg Msg) error { fmt.Printf("code: %v, msg: %v.\n", msg.Code, msg.Payload) switch { case msg.Code == pingMsg: msg.Discard() gopool.Submit(func() { SendItems(p.rw, pongMsg) }) case msg.Code == discMsg: var m struct{ R DiscReason } rlp.Decode(msg.Payload, &m) return m.R case msg.Code < baseProtocolLength: return msg.Discard() } return nil } func (p *Peer) pingLoop() { ping := time.NewTimer(pingInterval) defer p.wg.Done() defer ping.Stop() for { select { case <-ping.C: if err := SendItems(p.rw, pingMsg); err != nil { p.protoErr <- err return } ping.Reset(pingInterval) case <-p.closed: return } } } type protoRW struct { Protocol in chan Msg // receives read messages closed <-chan struct{} // receives when peer is shutting down wstart <-chan struct{} // receives when write may start werr chan<- error // for write results offset uint64 w MsgWriter } func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { sort.Sort(capsByNameAndVersion(caps)) offset := baseProtocolLength result := make(map[string]*protoRW) outer: for _, capability := range caps { for _, proto := range protocols { if proto.Name == capability.Name && proto.Version == capability.Version { if old := result[capability.Name]; old != nil { offset -= old.Length } result[capability.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} offset += proto.Length continue outer } } } return result }