| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- package p2p
- import (
- "bytes"
- // "fmt"
- "net"
- "time"
- "github.com/ethereum/eth-go/ethutil"
- )
- type Connection struct {
- conn net.Conn
- // conn NetworkConnection
- timeout time.Duration
- in chan []byte
- out chan []byte
- err chan *PeerError
- closingIn chan chan bool
- closingOut chan chan bool
- }
- // const readBufferLength = 2 //for testing
- const readBufferLength = 1440
- const partialsQueueSize = 10
- const maxPendingQueueSize = 1
- const defaultTimeout = 500
- var magicToken = []byte{34, 64, 8, 145}
- func (self *Connection) Open() {
- go self.startRead()
- go self.startWrite()
- }
- func (self *Connection) Close() {
- self.closeIn()
- self.closeOut()
- }
- func (self *Connection) closeIn() {
- errc := make(chan bool)
- self.closingIn <- errc
- <-errc
- }
- func (self *Connection) closeOut() {
- errc := make(chan bool)
- self.closingOut <- errc
- <-errc
- }
- func NewConnection(conn net.Conn, errchan chan *PeerError) *Connection {
- return &Connection{
- conn: conn,
- timeout: defaultTimeout,
- in: make(chan []byte),
- out: make(chan []byte),
- err: errchan,
- closingIn: make(chan chan bool, 1),
- closingOut: make(chan chan bool, 1),
- }
- }
- func (self *Connection) Read() <-chan []byte {
- return self.in
- }
- func (self *Connection) Write() chan<- []byte {
- return self.out
- }
- func (self *Connection) Error() <-chan *PeerError {
- return self.err
- }
- func (self *Connection) startRead() {
- payloads := make(chan []byte)
- done := make(chan *PeerError)
- pending := [][]byte{}
- var head []byte
- var wait time.Duration // initally 0 (no delay)
- read := time.After(wait * time.Millisecond)
- for {
- // if pending empty, nil channel blocks
- var in chan []byte
- if len(pending) > 0 {
- in = self.in // enable send case
- head = pending[0]
- } else {
- in = nil
- }
- select {
- case <-read:
- go self.read(payloads, done)
- case err := <-done:
- if err == nil { // no error but nothing to read
- if len(pending) < maxPendingQueueSize {
- wait = 100
- } else if wait == 0 {
- wait = 100
- } else {
- wait = 2 * wait
- }
- } else {
- self.err <- err // report error
- wait = 100
- }
- read = time.After(wait * time.Millisecond)
- case payload := <-payloads:
- pending = append(pending, payload)
- if len(pending) < maxPendingQueueSize {
- wait = 0
- } else {
- wait = 100
- }
- read = time.After(wait * time.Millisecond)
- case in <- head:
- pending = pending[1:]
- case errc := <-self.closingIn:
- errc <- true
- close(self.in)
- return
- }
- }
- }
- func (self *Connection) startWrite() {
- pending := [][]byte{}
- done := make(chan *PeerError)
- writing := false
- for {
- if len(pending) > 0 && !writing {
- writing = true
- go self.write(pending[0], done)
- }
- select {
- case payload := <-self.out:
- pending = append(pending, payload)
- case err := <-done:
- if err == nil {
- pending = pending[1:]
- writing = false
- } else {
- self.err <- err // report error
- }
- case errc := <-self.closingOut:
- errc <- true
- close(self.out)
- return
- }
- }
- }
- func pack(payload []byte) (packet []byte) {
- length := ethutil.NumberToBytes(uint32(len(payload)), 32)
- // return error if too long?
- // Write magic token and payload length (first 8 bytes)
- packet = append(magicToken, length...)
- packet = append(packet, payload...)
- return
- }
- func avoidPanic(done chan *PeerError) {
- if rec := recover(); rec != nil {
- err := NewPeerError(MiscError, " %v", rec)
- logger.Debugln(err)
- done <- err
- }
- }
- func (self *Connection) write(payload []byte, done chan *PeerError) {
- defer avoidPanic(done)
- var err *PeerError
- _, ok := self.conn.Write(pack(payload))
- if ok != nil {
- err = NewPeerError(WriteError, " %v", ok)
- logger.Debugln(err)
- }
- done <- err
- }
- func (self *Connection) read(payloads chan []byte, done chan *PeerError) {
- //defer avoidPanic(done)
- partials := make(chan []byte, partialsQueueSize)
- errc := make(chan *PeerError)
- go self.readPartials(partials, errc)
- packet := []byte{}
- length := 8
- start := true
- var err *PeerError
- out:
- for {
- // appends partials read via connection until packet is
- // - either parseable (>=8bytes)
- // - or complete (payload fully consumed)
- for len(packet) < length {
- partial, ok := <-partials
- if !ok { // partials channel is closed
- err = <-errc
- if err == nil && len(packet) > 0 {
- if start {
- err = NewPeerError(PacketTooShort, "%v", packet)
- } else {
- err = NewPeerError(PayloadTooShort, "%d < %d", len(packet), length)
- }
- }
- break out
- }
- packet = append(packet, partial...)
- }
- if start {
- // at least 8 bytes read, can validate packet
- if bytes.Compare(magicToken, packet[:4]) != 0 {
- err = NewPeerError(MagicTokenMismatch, " received %v", packet[:4])
- break
- }
- length = int(ethutil.BytesToNumber(packet[4:8]))
- packet = packet[8:]
- if length > 0 {
- start = false // now consuming payload
- } else { //penalize peer but read on
- self.err <- NewPeerError(EmptyPayload, "")
- length = 8
- }
- } else {
- // packet complete (payload fully consumed)
- payloads <- packet[:length]
- packet = packet[length:] // resclice packet
- start = true
- length = 8
- }
- }
- // this stops partials read via the connection, should we?
- //if err != nil {
- // select {
- // case errc <- err
- // default:
- //}
- done <- err
- }
- func (self *Connection) readPartials(partials chan []byte, errc chan *PeerError) {
- defer close(partials)
- for {
- // Give buffering some time
- self.conn.SetReadDeadline(time.Now().Add(self.timeout * time.Millisecond))
- buffer := make([]byte, readBufferLength)
- // read partial from connection
- bytesRead, err := self.conn.Read(buffer)
- if err == nil || err.Error() == "EOF" {
- if bytesRead > 0 {
- partials <- buffer[:bytesRead]
- }
- if err != nil && err.Error() == "EOF" {
- break
- }
- } else {
- // unexpected error, report to errc
- err := NewPeerError(ReadError, " %v", err)
- logger.Debugln(err)
- errc <- err
- return // will close partials channel
- }
- }
- close(errc)
- }
|