messenger.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package p2p
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. handlerTimeout = 1000
  9. )
  10. type Handlers map[string](func(p *Peer) Protocol)
  11. type Messenger struct {
  12. conn *Connection
  13. peer *Peer
  14. handlers Handlers
  15. protocolLock sync.RWMutex
  16. protocols []Protocol
  17. offsets []MsgCode // offsets for adaptive message idss
  18. protocolTable map[string]int
  19. quit chan chan bool
  20. err chan *PeerError
  21. pulse chan bool
  22. }
  23. func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger {
  24. baseProtocol := NewBaseProtocol(peer)
  25. return &Messenger{
  26. conn: conn,
  27. peer: peer,
  28. offsets: []MsgCode{baseProtocol.Offset()},
  29. handlers: handlers,
  30. protocols: []Protocol{baseProtocol},
  31. protocolTable: make(map[string]int),
  32. err: errchan,
  33. pulse: make(chan bool, 1),
  34. quit: make(chan chan bool, 1),
  35. }
  36. }
  37. func (self *Messenger) Start() {
  38. self.conn.Open()
  39. go self.messenger()
  40. self.protocolLock.RLock()
  41. defer self.protocolLock.RUnlock()
  42. self.protocols[0].Start()
  43. }
  44. func (self *Messenger) Stop() {
  45. // close pulse to stop ping pong monitoring
  46. close(self.pulse)
  47. self.protocolLock.RLock()
  48. defer self.protocolLock.RUnlock()
  49. for _, protocol := range self.protocols {
  50. protocol.Stop() // could be parallel
  51. }
  52. q := make(chan bool)
  53. self.quit <- q
  54. <-q
  55. self.conn.Close()
  56. }
  57. func (self *Messenger) messenger() {
  58. in := self.conn.Read()
  59. for {
  60. select {
  61. case payload, ok := <-in:
  62. //dispatches message to the protocol asynchronously
  63. if ok {
  64. go self.handle(payload)
  65. } else {
  66. return
  67. }
  68. case q := <-self.quit:
  69. q <- true
  70. return
  71. }
  72. }
  73. }
  74. // handles each message by dispatching to the appropriate protocol
  75. // using adaptive message codes
  76. // this function is started as a separate go routine for each message
  77. // it waits for the protocol response
  78. // then encodes and sends outgoing messages to the connection's write channel
  79. func (self *Messenger) handle(payload []byte) {
  80. // send ping to heartbeat channel signalling time of last message
  81. // select {
  82. // case self.pulse <- true:
  83. // default:
  84. // }
  85. self.pulse <- true
  86. // initialise message from payload
  87. msg, err := NewMsgFromBytes(payload)
  88. if err != nil {
  89. self.err <- NewPeerError(MiscError, " %v", err)
  90. return
  91. }
  92. // retrieves protocol based on message Code
  93. protocol, offset, peerErr := self.getProtocol(msg.Code())
  94. if err != nil {
  95. self.err <- peerErr
  96. return
  97. }
  98. // reset message code based on adaptive offset
  99. msg.Decode(offset)
  100. // dispatches
  101. response := make(chan *Msg)
  102. go protocol.HandleIn(msg, response)
  103. // protocol reponse timeout to prevent leaks
  104. timer := time.After(handlerTimeout * time.Millisecond)
  105. for {
  106. select {
  107. case outgoing, ok := <-response:
  108. // we check if response channel is not closed
  109. if ok {
  110. self.conn.Write() <- outgoing.Encode(offset)
  111. } else {
  112. return
  113. }
  114. case <-timer:
  115. return
  116. }
  117. }
  118. }
  119. // negotiated protocols
  120. // stores offsets needed for adaptive message id scheme
  121. // based on offsets set at handshake
  122. // get the right protocol to handle the message
  123. func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) {
  124. self.protocolLock.RLock()
  125. defer self.protocolLock.RUnlock()
  126. base := MsgCode(0)
  127. for index, offset := range self.offsets {
  128. if code < offset {
  129. return self.protocols[index], base, nil
  130. }
  131. base = offset
  132. }
  133. return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code)
  134. }
  135. func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) {
  136. fmt.Printf("pingpong keepalive started at %v", time.Now())
  137. timer := time.After(timeout)
  138. pinged := false
  139. for {
  140. select {
  141. case _, ok := <-self.pulse:
  142. if ok {
  143. pinged = false
  144. timer = time.After(timeout)
  145. } else {
  146. // pulse is closed, stop monitoring
  147. return
  148. }
  149. case <-timer:
  150. if pinged {
  151. fmt.Printf("timeout at %v", time.Now())
  152. timeoutCallback()
  153. return
  154. } else {
  155. fmt.Printf("pinged at %v", time.Now())
  156. pingCallback()
  157. timer = time.After(gracePeriod)
  158. pinged = true
  159. }
  160. }
  161. }
  162. }
  163. func (self *Messenger) AddProtocols(protocols []string) {
  164. self.protocolLock.Lock()
  165. defer self.protocolLock.Unlock()
  166. i := len(self.offsets)
  167. offset := self.offsets[i-1]
  168. for _, name := range protocols {
  169. protocolFunc, ok := self.handlers[name]
  170. if ok {
  171. protocol := protocolFunc(self.peer)
  172. self.protocolTable[name] = i
  173. i++
  174. offset += protocol.Offset()
  175. fmt.Println("offset ", name, offset)
  176. self.offsets = append(self.offsets, offset)
  177. self.protocols = append(self.protocols, protocol)
  178. protocol.Start()
  179. } else {
  180. fmt.Println("no ", name)
  181. // protocol not handled
  182. }
  183. }
  184. }
  185. func (self *Messenger) Write(protocol string, msg *Msg) error {
  186. self.protocolLock.RLock()
  187. defer self.protocolLock.RUnlock()
  188. i := 0
  189. offset := MsgCode(0)
  190. if len(protocol) > 0 {
  191. var ok bool
  192. i, ok = self.protocolTable[protocol]
  193. if !ok {
  194. return fmt.Errorf("protocol %v not handled by peer", protocol)
  195. }
  196. offset = self.offsets[i-1]
  197. }
  198. handler := self.protocols[i]
  199. // checking if protocol status/caps allows the message to be sent out
  200. if handler.HandleOut(msg) {
  201. self.conn.Write() <- msg.Encode(offset)
  202. }
  203. return nil
  204. }