connection.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package p2p
  2. import (
  3. "bytes"
  4. // "fmt"
  5. "net"
  6. "time"
  7. "github.com/ethereum/eth-go/ethutil"
  8. )
  9. type Connection struct {
  10. conn net.Conn
  11. // conn NetworkConnection
  12. timeout time.Duration
  13. in chan []byte
  14. out chan []byte
  15. err chan *PeerError
  16. closingIn chan chan bool
  17. closingOut chan chan bool
  18. }
  19. // const readBufferLength = 2 //for testing
  20. const readBufferLength = 1440
  21. const partialsQueueSize = 10
  22. const maxPendingQueueSize = 1
  23. const defaultTimeout = 500
  24. var magicToken = []byte{34, 64, 8, 145}
  25. func (self *Connection) Open() {
  26. go self.startRead()
  27. go self.startWrite()
  28. }
  29. func (self *Connection) Close() {
  30. self.closeIn()
  31. self.closeOut()
  32. }
  33. func (self *Connection) closeIn() {
  34. errc := make(chan bool)
  35. self.closingIn <- errc
  36. <-errc
  37. }
  38. func (self *Connection) closeOut() {
  39. errc := make(chan bool)
  40. self.closingOut <- errc
  41. <-errc
  42. }
  43. func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
  44. return &Connection{
  45. conn: conn,
  46. timeout: defaultTimeout,
  47. in: make(chan []byte),
  48. out: make(chan []byte),
  49. err: errchan,
  50. closingIn: make(chan chan bool, 1),
  51. closingOut: make(chan chan bool, 1),
  52. }
  53. }
  54. func (self *Connection) Read() <-chan []byte {
  55. return self.in
  56. }
  57. func (self *Connection) Write() chan<- []byte {
  58. return self.out
  59. }
  60. func (self *Connection) Error() <-chan *PeerError {
  61. return self.err
  62. }
  63. func (self *Connection) startRead() {
  64. payloads := make(chan []byte)
  65. done := make(chan *PeerError)
  66. pending := [][]byte{}
  67. var head []byte
  68. var wait time.Duration // initally 0 (no delay)
  69. read := time.After(wait * time.Millisecond)
  70. for {
  71. // if pending empty, nil channel blocks
  72. var in chan []byte
  73. if len(pending) > 0 {
  74. in = self.in // enable send case
  75. head = pending[0]
  76. } else {
  77. in = nil
  78. }
  79. select {
  80. case <-read:
  81. go self.read(payloads, done)
  82. case err := <-done:
  83. if err == nil { // no error but nothing to read
  84. if len(pending) < maxPendingQueueSize {
  85. wait = 100
  86. } else if wait == 0 {
  87. wait = 100
  88. } else {
  89. wait = 2 * wait
  90. }
  91. } else {
  92. self.err <- err // report error
  93. wait = 100
  94. }
  95. read = time.After(wait * time.Millisecond)
  96. case payload := <-payloads:
  97. pending = append(pending, payload)
  98. if len(pending) < maxPendingQueueSize {
  99. wait = 0
  100. } else {
  101. wait = 100
  102. }
  103. read = time.After(wait * time.Millisecond)
  104. case in <- head:
  105. pending = pending[1:]
  106. case errc := <-self.closingIn:
  107. errc <- true
  108. close(self.in)
  109. return
  110. }
  111. }
  112. }
  113. func (self *Connection) startWrite() {
  114. pending := [][]byte{}
  115. done := make(chan *PeerError)
  116. writing := false
  117. for {
  118. if len(pending) > 0 && !writing {
  119. writing = true
  120. go self.write(pending[0], done)
  121. }
  122. select {
  123. case payload := <-self.out:
  124. pending = append(pending, payload)
  125. case err := <-done:
  126. if err == nil {
  127. pending = pending[1:]
  128. writing = false
  129. } else {
  130. self.err <- err // report error
  131. }
  132. case errc := <-self.closingOut:
  133. errc <- true
  134. close(self.out)
  135. return
  136. }
  137. }
  138. }
  139. func pack(payload []byte) (packet []byte) {
  140. length := ethutil.NumberToBytes(uint32(len(payload)), 32)
  141. // return error if too long?
  142. // Write magic token and payload length (first 8 bytes)
  143. packet = append(magicToken, length...)
  144. packet = append(packet, payload...)
  145. return
  146. }
  147. func avoidPanic(done chan *PeerError) {
  148. if rec := recover(); rec != nil {
  149. err := NewPeerError(MiscError, " %v", rec)
  150. logger.Debugln(err)
  151. done <- err
  152. }
  153. }
  154. func (self *Connection) write(payload []byte, done chan *PeerError) {
  155. defer avoidPanic(done)
  156. var err *PeerError
  157. _, ok := self.conn.Write(pack(payload))
  158. if ok != nil {
  159. err = NewPeerError(WriteError, " %v", ok)
  160. logger.Debugln(err)
  161. }
  162. done <- err
  163. }
  164. func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
  165. //defer avoidPanic(done)
  166. partials := make(chan []byte, partialsQueueSize)
  167. errc := make(chan *PeerError)
  168. go self.readPartials(partials, errc)
  169. packet := []byte{}
  170. length := 8
  171. start := true
  172. var err *PeerError
  173. out:
  174. for {
  175. // appends partials read via connection until packet is
  176. // - either parseable (>=8bytes)
  177. // - or complete (payload fully consumed)
  178. for len(packet) < length {
  179. partial, ok := <-partials
  180. if !ok { // partials channel is closed
  181. err = <-errc
  182. if err == nil && len(packet) > 0 {
  183. if start {
  184. err = NewPeerError(PacketTooShort, "%v", packet)
  185. } else {
  186. err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
  187. }
  188. }
  189. break out
  190. }
  191. packet = append(packet, partial...)
  192. }
  193. if start {
  194. // at least 8 bytes read, can validate packet
  195. if bytes.Compare(magicToken, packet[:4]) != 0 {
  196. err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
  197. break
  198. }
  199. length = int(ethutil.BytesToNumber(packet[4:8]))
  200. packet = packet[8:]
  201. if length > 0 {
  202. start = false // now consuming payload
  203. } else { //penalize peer but read on
  204. self.err <- NewPeerError(EmptyPayload, "")
  205. length = 8
  206. }
  207. } else {
  208. // packet complete (payload fully consumed)
  209. payloads <- packet[:length]
  210. packet = packet[length:] // resclice packet
  211. start = true
  212. length = 8
  213. }
  214. }
  215. // this stops partials read via the connection, should we?
  216. //if err != nil {
  217. // select {
  218. // case errc <- err
  219. // default:
  220. //}
  221. done <- err
  222. }
  223. func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
  224. defer close(partials)
  225. for {
  226. // Give buffering some time
  227. self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
  228. buffer := make([]byte, readBufferLength)
  229. // read partial from connection
  230. bytesRead, err := self.conn.Read(buffer)
  231. if err == nil || err.Error() == "EOF" {
  232. if bytesRead > 0 {
  233. partials <- buffer[:bytesRead]
  234. }
  235. if err != nil && err.Error() == "EOF" {
  236. break
  237. }
  238. } else {
  239. // unexpected error, report to errc
  240. err := NewPeerError(ReadError, " %v", err)
  241. logger.Debugln(err)
  242. errc <- err
  243. return // will close partials channel
  244. }
  245. }
  246. close(errc)
  247. }