| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- package p2p
- import (
- "blockchain-go/common/mclock"
- "blockchain-go/p2p/enode"
- "errors"
- "net"
- "sort"
- "sync"
- "time"
- )
- var (
- ErrShuttingDown = errors.New("shutting down")
- )
- const (
- baseProtocolVersion = 5
- baseProtocolLength = uint64(16)
- baseProtocolMaxMsgSize = 2 * 1024
- snappyProtocolVersion = 5
- pingInterval = 15 * time.Second
- )
- const (
- // devp2p message codes
- handshakeMsg = 0x00
- discMsg = 0x01
- pingMsg = 0x02
- pongMsg = 0x03
- )
- // protoHandshake is the RLP structure of the protocol handshake.
- type protoHandshake struct {
- Version uint64
- Name string
- Caps []Cap
- ListenPort uint64
- ID []byte // secp256k1 public key
- // Ignore additional fields (for forward compatibility).
- //Rest []rlp.RawValue `rlp:"tail"`
- }
- type PeerEventType string
- const (
- // PeerEventTypeAdd is the type of event emitted when a peer is added
- // to a p2p.Server
- PeerEventTypeAdd PeerEventType = "add"
- // PeerEventTypeDrop is the type of event emitted when a peer is
- // dropped from a p2p.Server
- PeerEventTypeDrop PeerEventType = "drop"
- // PeerEventTypeMsgSend is the type of event emitted when a
- // message is successfully sent to a peer
- PeerEventTypeMsgSend PeerEventType = "msgsend"
- // PeerEventTypeMsgRecv is the type of event emitted when a
- // message is received from a peer
- PeerEventTypeMsgRecv PeerEventType = "msgrecv"
- )
- // PeerEvent is an event emitted when peers are either added or dropped from
- // a p2p.Server or when a message is sent or received on a peer connection
- type PeerEvent struct {
- Type PeerEventType `json:"type"`
- Peer enode.ID `json:"peer"`
- Error string `json:"error,omitempty"`
- Protocol string `json:"protocol,omitempty"`
- MsgCode *uint64 `json:"msg_code,omitempty"`
- MsgSize *uint32 `json:"msg_size,omitempty"`
- LocalAddress string `json:"local,omitempty"`
- RemoteAddress string `json:"remote,omitempty"`
- }
- // Peer represents a connected remote node.
- type Peer struct {
- rw *conn
- running map[string]*protoRW
- created mclock.AbsTime
- protoErr chan error
- closed chan struct{}
- disc chan DiscReason
- wg sync.WaitGroup
- //log log.Logger
- // events receives message send / receive events if set
- //events *event.Feed
- }
- func newPeer(conn *conn, protocols []Protocol) *Peer {
- protoMap := matchProtocols(protocols, conn.caps, conn)
- p := &Peer{
- rw: conn,
- running: protoMap,
- created: mclock.Now(),
- protoErr: make(chan error, len(protoMap)+1),
- closed: make(chan struct{}),
- }
- return p
- }
- func (p *Peer) Inbound() bool {
- return p.rw.is(inboundConn)
- }
- func (p *Peer) ID() enode.ID {
- return p.rw.node.ID()
- }
- func (p *Peer) RemoteAddr() net.Addr {
- return p.rw.fd.RemoteAddr()
- }
- func (p *Peer) LocalAddr() net.Addr {
- return p.rw.fd.LocalAddr()
- }
- func (p *Peer) run() (remoteRequested bool, err error) {
- var (
- writeStart = make(chan struct{}, 1)
- writeErr = make(chan error, 1)
- readErr = make(chan error, 1)
- reason DiscReason
- )
- p.wg.Add(2)
- go p.readLoop(readErr)
- go p.pingLoop()
- writeStart <- struct{}{}
- loop:
- for {
- select {
- case err = <-writeErr:
- if err != nil {
- reason = DiscNetworkError
- break loop
- }
- writeStart <- struct{}{}
- case err = <-readErr:
- if r, ok := err.(DiscReason); ok {
- remoteRequested = true
- reason = r
- } else {
- reason = DiscNetworkError
- }
- break loop
- case err = <-p.protoErr:
- reason = discReasonForError(err)
- break loop
- case err = <-p.disc:
- reason = discReasonForError(err)
- break loop
- }
- }
- close(p.closed)
- p.rw.close(reason)
- p.wg.Wait()
- return remoteRequested, err
- }
- func (p *Peer) readLoop(errc chan<- error) {
- defer p.wg.Done()
- // TODO 实现readLoop
- }
- func (p *Peer) pingLoop() {
- // TODO 实现pingLoop
- }
- type protoRW struct {
- Protocol
- in chan Msg // receives read messages
- closed <-chan struct{} // receives when peer is shutting down
- wstart <-chan struct{} // receives when write may start
- werr chan<- error // for write results
- offset uint64
- w MsgWriter
- }
- func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
- sort.Sort(capsByNameAndVersion(caps))
- offset := baseProtocolLength
- result := make(map[string]*protoRW)
- outer:
- for _, capability := range caps {
- for _, proto := range protocols {
- if proto.Name == capability.Name && proto.Version == capability.Version {
- if old := result[capability.Name]; old != nil {
- offset -= old.Length
- }
- result[capability.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw}
- offset += proto.Length
- continue outer
- }
- }
- }
- return result
- }
|