| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- package p2p
- import (
- "fmt"
- "sync"
- "time"
- )
- const (
- handlerTimeout = 1000
- )
- type Handlers map[string](func(p *Peer) Protocol)
- type Messenger struct {
- conn *Connection
- peer *Peer
- handlers Handlers
- protocolLock sync.RWMutex
- protocols []Protocol
- offsets []MsgCode // offsets for adaptive message idss
- protocolTable map[string]int
- quit chan chan bool
- err chan *PeerError
- pulse chan bool
- }
- func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger {
- baseProtocol := NewBaseProtocol(peer)
- return &Messenger{
- conn: conn,
- peer: peer,
- offsets: []MsgCode{baseProtocol.Offset()},
- handlers: handlers,
- protocols: []Protocol{baseProtocol},
- protocolTable: make(map[string]int),
- err: errchan,
- pulse: make(chan bool, 1),
- quit: make(chan chan bool, 1),
- }
- }
- func (self *Messenger) Start() {
- self.conn.Open()
- go self.messenger()
- self.protocolLock.RLock()
- defer self.protocolLock.RUnlock()
- self.protocols[0].Start()
- }
- func (self *Messenger) Stop() {
- // close pulse to stop ping pong monitoring
- close(self.pulse)
- self.protocolLock.RLock()
- defer self.protocolLock.RUnlock()
- for _, protocol := range self.protocols {
- protocol.Stop() // could be parallel
- }
- q := make(chan bool)
- self.quit <- q
- <-q
- self.conn.Close()
- }
- func (self *Messenger) messenger() {
- in := self.conn.Read()
- for {
- select {
- case payload, ok := <-in:
- //dispatches message to the protocol asynchronously
- if ok {
- go self.handle(payload)
- } else {
- return
- }
- case q := <-self.quit:
- q <- true
- return
- }
- }
- }
- // handles each message by dispatching to the appropriate protocol
- // using adaptive message codes
- // this function is started as a separate go routine for each message
- // it waits for the protocol response
- // then encodes and sends outgoing messages to the connection's write channel
- func (self *Messenger) handle(payload []byte) {
- // send ping to heartbeat channel signalling time of last message
- // select {
- // case self.pulse <- true:
- // default:
- // }
- self.pulse <- true
- // initialise message from payload
- msg, err := NewMsgFromBytes(payload)
- if err != nil {
- self.err <- NewPeerError(MiscError, " %v", err)
- return
- }
- // retrieves protocol based on message Code
- protocol, offset, peerErr := self.getProtocol(msg.Code())
- if err != nil {
- self.err <- peerErr
- return
- }
- // reset message code based on adaptive offset
- msg.Decode(offset)
- // dispatches
- response := make(chan *Msg)
- go protocol.HandleIn(msg, response)
- // protocol reponse timeout to prevent leaks
- timer := time.After(handlerTimeout * time.Millisecond)
- for {
- select {
- case outgoing, ok := <-response:
- // we check if response channel is not closed
- if ok {
- self.conn.Write() <- outgoing.Encode(offset)
- } else {
- return
- }
- case <-timer:
- return
- }
- }
- }
- // negotiated protocols
- // stores offsets needed for adaptive message id scheme
- // based on offsets set at handshake
- // get the right protocol to handle the message
- func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) {
- self.protocolLock.RLock()
- defer self.protocolLock.RUnlock()
- base := MsgCode(0)
- for index, offset := range self.offsets {
- if code < offset {
- return self.protocols[index], base, nil
- }
- base = offset
- }
- return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code)
- }
- func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) {
- fmt.Printf("pingpong keepalive started at %v", time.Now())
- timer := time.After(timeout)
- pinged := false
- for {
- select {
- case _, ok := <-self.pulse:
- if ok {
- pinged = false
- timer = time.After(timeout)
- } else {
- // pulse is closed, stop monitoring
- return
- }
- case <-timer:
- if pinged {
- fmt.Printf("timeout at %v", time.Now())
- timeoutCallback()
- return
- } else {
- fmt.Printf("pinged at %v", time.Now())
- pingCallback()
- timer = time.After(gracePeriod)
- pinged = true
- }
- }
- }
- }
- func (self *Messenger) AddProtocols(protocols []string) {
- self.protocolLock.Lock()
- defer self.protocolLock.Unlock()
- i := len(self.offsets)
- offset := self.offsets[i-1]
- for _, name := range protocols {
- protocolFunc, ok := self.handlers[name]
- if ok {
- protocol := protocolFunc(self.peer)
- self.protocolTable[name] = i
- i++
- offset += protocol.Offset()
- fmt.Println("offset ", name, offset)
- self.offsets = append(self.offsets, offset)
- self.protocols = append(self.protocols, protocol)
- protocol.Start()
- } else {
- fmt.Println("no ", name)
- // protocol not handled
- }
- }
- }
- func (self *Messenger) Write(protocol string, msg *Msg) error {
- self.protocolLock.RLock()
- defer self.protocolLock.RUnlock()
- i := 0
- offset := MsgCode(0)
- if len(protocol) > 0 {
- var ok bool
- i, ok = self.protocolTable[protocol]
- if !ok {
- return fmt.Errorf("protocol %v not handled by peer", protocol)
- }
- offset = self.offsets[i-1]
- }
- handler := self.protocols[i]
- // checking if protocol status/caps allows the message to be sent out
- if handler.HandleOut(msg) {
- self.conn.Write() <- msg.Encode(offset)
- }
- return nil
- }
|