Browse Source

Merge pull request #15919 from ethersphere/p2p-protocols-pr

p2p/protocols, p2p/testing: protocol abstraction and testing
Balint Gabor 7 years ago
parent
commit
221486a291

+ 311 - 0
p2p/protocols/protocol.go

@@ -0,0 +1,311 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+/*
+Package protocols is an extension to p2p. It offers a user friendly simple way to define
+devp2p subprotocols by abstracting away code standardly shared by protocols.
+
+* automate assigments of code indexes to messages
+* automate RLP decoding/encoding based on reflecting
+* provide the forever loop to read incoming messages
+* standardise error handling related to communication
+* standardised	handshake negotiation
+* TODO: automatic generation of wire protocol specification for peers
+
+*/
+package protocols
+
+import (
+	"context"
+	"fmt"
+	"reflect"
+	"sync"
+
+	"github.com/ethereum/go-ethereum/p2p"
+)
+
+// error codes used by this  protocol scheme
+const (
+	ErrMsgTooLong = iota
+	ErrDecode
+	ErrWrite
+	ErrInvalidMsgCode
+	ErrInvalidMsgType
+	ErrHandshake
+	ErrNoHandler
+	ErrHandler
+)
+
+// error description strings associated with the codes
+var errorToString = map[int]string{
+	ErrMsgTooLong:     "Message too long",
+	ErrDecode:         "Invalid message (RLP error)",
+	ErrWrite:          "Error sending message",
+	ErrInvalidMsgCode: "Invalid message code",
+	ErrInvalidMsgType: "Invalid message type",
+	ErrHandshake:      "Handshake error",
+	ErrNoHandler:      "No handler registered error",
+	ErrHandler:        "Message handler error",
+}
+
+/*
+Error implements the standard go error interface.
+Use:
+
+  errorf(code, format, params ...interface{})
+
+Prints as:
+
+ <description>: <details>
+
+where description is given by code in errorToString
+and details is fmt.Sprintf(format, params...)
+
+exported field Code can be checked
+*/
+type Error struct {
+	Code    int
+	message string
+	format  string
+	params  []interface{}
+}
+
+func (e Error) Error() (message string) {
+	if len(e.message) == 0 {
+		name, ok := errorToString[e.Code]
+		if !ok {
+			panic("invalid message code")
+		}
+		e.message = name
+		if e.format != "" {
+			e.message += ": " + fmt.Sprintf(e.format, e.params...)
+		}
+	}
+	return e.message
+}
+
+func errorf(code int, format string, params ...interface{}) *Error {
+	return &Error{
+		Code:   code,
+		format: format,
+		params: params,
+	}
+}
+
+// Spec is a protocol specification including its name and version as well as
+// the types of messages which are exchanged
+type Spec struct {
+	// Name is the name of the protocol, often a three-letter word
+	Name string
+
+	// Version is the version number of the protocol
+	Version uint
+
+	// MaxMsgSize is the maximum accepted length of the message payload
+	MaxMsgSize uint32
+
+	// Messages is a list of message data types which this protocol uses, with
+	// each message type being sent with its array index as the code (so
+	// [&foo{}, &bar{}, &baz{}] would send foo, bar and baz with codes
+	// 0, 1 and 2 respectively)
+	// each message must have a single unique data type
+	Messages []interface{}
+
+	initOnce sync.Once
+	codes    map[reflect.Type]uint64
+	types    map[uint64]reflect.Type
+}
+
+func (s *Spec) init() {
+	s.initOnce.Do(func() {
+		s.codes = make(map[reflect.Type]uint64, len(s.Messages))
+		s.types = make(map[uint64]reflect.Type, len(s.Messages))
+		for i, msg := range s.Messages {
+			code := uint64(i)
+			typ := reflect.TypeOf(msg)
+			if typ.Kind() == reflect.Ptr {
+				typ = typ.Elem()
+			}
+			s.codes[typ] = code
+			s.types[code] = typ
+		}
+	})
+}
+
+// Length returns the number of message types in the protocol
+func (s *Spec) Length() uint64 {
+	return uint64(len(s.Messages))
+}
+
+// GetCode returns the message code of a type, and boolean second argument is
+// false if the message type is not found
+func (s *Spec) GetCode(msg interface{}) (uint64, bool) {
+	s.init()
+	typ := reflect.TypeOf(msg)
+	if typ.Kind() == reflect.Ptr {
+		typ = typ.Elem()
+	}
+	code, ok := s.codes[typ]
+	return code, ok
+}
+
+// NewMsg construct a new message type given the code
+func (s *Spec) NewMsg(code uint64) (interface{}, bool) {
+	s.init()
+	typ, ok := s.types[code]
+	if !ok {
+		return nil, false
+	}
+	return reflect.New(typ).Interface(), true
+}
+
+// Peer represents a remote peer or protocol instance that is running on a peer connection with
+// a remote peer
+type Peer struct {
+	*p2p.Peer                   // the p2p.Peer object representing the remote
+	rw        p2p.MsgReadWriter // p2p.MsgReadWriter to send messages to and read messages from
+	spec      *Spec
+}
+
+// NewPeer constructs a new peer
+// this constructor is called by the p2p.Protocol#Run function
+// the first two arguments are the arguments passed to p2p.Protocol.Run function
+// the third argument is the Spec describing the protocol
+func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
+	return &Peer{
+		Peer: p,
+		rw:   rw,
+		spec: spec,
+	}
+}
+
+// Run starts the forever loop that handles incoming messages
+// called within the p2p.Protocol#Run function
+// the handler argument is a function which is called for each message received
+// from the remote peer, a returned error causes the loop to exit
+// resulting in disconnection
+func (p *Peer) Run(handler func(msg interface{}) error) error {
+	for {
+		if err := p.handleIncoming(handler); err != nil {
+			return err
+		}
+	}
+}
+
+// Drop disconnects a peer.
+// TODO: may need to implement protocol drop only? don't want to kick off the peer
+// if they are useful for other protocols
+func (p *Peer) Drop(err error) {
+	p.Disconnect(p2p.DiscSubprotocolError)
+}
+
+// Send takes a message, encodes it in RLP, finds the right message code and sends the
+// message off to the peer
+// this low level call will be wrapped by libraries providing routed or broadcast sends
+// but often just used to forward and push messages to directly connected peers
+func (p *Peer) Send(msg interface{}) error {
+	code, found := p.spec.GetCode(msg)
+	if !found {
+		return errorf(ErrInvalidMsgType, "%v", code)
+	}
+	return p2p.Send(p.rw, code, msg)
+}
+
+// handleIncoming(code)
+// is called each cycle of the main forever loop that dispatches incoming messages
+// if this returns an error the loop returns and the peer is disconnected with the error
+// this generic handler
+// * checks message size,
+// * checks for out-of-range message codes,
+// * handles decoding with reflection,
+// * call handlers as callbacks
+func (p *Peer) handleIncoming(handle func(msg interface{}) error) error {
+	msg, err := p.rw.ReadMsg()
+	if err != nil {
+		return err
+	}
+	// make sure that the payload has been fully consumed
+	defer msg.Discard()
+
+	if msg.Size > p.spec.MaxMsgSize {
+		return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
+	}
+
+	val, ok := p.spec.NewMsg(msg.Code)
+	if !ok {
+		return errorf(ErrInvalidMsgCode, "%v", msg.Code)
+	}
+	if err := msg.Decode(val); err != nil {
+		return errorf(ErrDecode, "<= %v: %v", msg, err)
+	}
+
+	// call the registered handler callbacks
+	// a registered callback take the decoded message as argument as an interface
+	// which the handler is supposed to cast to the appropriate type
+	// it is entirely safe not to check the cast in the handler since the handler is
+	// chosen based on the proper type in the first place
+	if err := handle(val); err != nil {
+		return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
+	}
+	return nil
+}
+
+// Handshake negotiates a handshake on the peer connection
+// * arguments
+//   * context
+//   * the local handshake to be sent to the remote peer
+//   * funcion to be called on the remote handshake (can be nil)
+// * expects a remote handshake back of the same type
+// * the dialing peer needs to send the handshake first and then waits for remote
+// * the listening peer waits for the remote handshake and then sends it
+// returns the remote handshake and an error
+func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interface{}) error) (rhs interface{}, err error) {
+	if _, ok := p.spec.GetCode(hs); !ok {
+		return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs)
+	}
+	errc := make(chan error, 2)
+	handle := func(msg interface{}) error {
+		rhs = msg
+		if verify != nil {
+			return verify(rhs)
+		}
+		return nil
+	}
+	send := func() { errc <- p.Send(hs) }
+	receive := func() { errc <- p.handleIncoming(handle) }
+
+	go func() {
+		if p.Inbound() {
+			receive()
+			send()
+		} else {
+			send()
+			receive()
+		}
+	}()
+
+	for i := 0; i < 2; i++ {
+		select {
+		case err = <-errc:
+		case <-ctx.Done():
+			err = ctx.Err()
+		}
+		if err != nil {
+			return nil, errorf(ErrHandshake, err.Error())
+		}
+	}
+	return rhs, nil
+}

+ 389 - 0
p2p/protocols/protocol_test.go

@@ -0,0 +1,389 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package protocols
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/p2p/discover"
+	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+	p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
+)
+
+// handshake message type
+type hs0 struct {
+	C uint
+}
+
+// message to kill/drop the peer with nodeID
+type kill struct {
+	C discover.NodeID
+}
+
+// message to drop connection
+type drop struct {
+}
+
+/// protoHandshake represents module-independent aspects of the protocol and is
+// the first message peers send and receive as part the initial exchange
+type protoHandshake struct {
+	Version   uint   // local and remote peer should have identical version
+	NetworkID string // local and remote peer should have identical network id
+}
+
+// checkProtoHandshake verifies local and remote protoHandshakes match
+func checkProtoHandshake(testVersion uint, testNetworkID string) func(interface{}) error {
+	return func(rhs interface{}) error {
+		remote := rhs.(*protoHandshake)
+		if remote.NetworkID != testNetworkID {
+			return fmt.Errorf("%s (!= %s)", remote.NetworkID, testNetworkID)
+		}
+
+		if remote.Version != testVersion {
+			return fmt.Errorf("%d (!= %d)", remote.Version, testVersion)
+		}
+		return nil
+	}
+}
+
+// newProtocol sets up a protocol
+// the run function here demonstrates a typical protocol using peerPool, handshake
+// and messages registered to handlers
+func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) error {
+	spec := &Spec{
+		Name:       "test",
+		Version:    42,
+		MaxMsgSize: 10 * 1024,
+		Messages: []interface{}{
+			protoHandshake{},
+			hs0{},
+			kill{},
+			drop{},
+		},
+	}
+	return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+		peer := NewPeer(p, rw, spec)
+
+		// initiate one-off protohandshake and check validity
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+		defer cancel()
+		phs := &protoHandshake{42, "420"}
+		hsCheck := checkProtoHandshake(phs.Version, phs.NetworkID)
+		_, err := peer.Handshake(ctx, phs, hsCheck)
+		if err != nil {
+			return err
+		}
+
+		lhs := &hs0{42}
+		// module handshake demonstrating a simple repeatable exchange of same-type message
+		hs, err := peer.Handshake(ctx, lhs, nil)
+		if err != nil {
+			return err
+		}
+
+		if rmhs := hs.(*hs0); rmhs.C > lhs.C {
+			return fmt.Errorf("handshake mismatch remote %v > local %v", rmhs.C, lhs.C)
+		}
+
+		handle := func(msg interface{}) error {
+			switch msg := msg.(type) {
+
+			case *protoHandshake:
+				return errors.New("duplicate handshake")
+
+			case *hs0:
+				rhs := msg
+				if rhs.C > lhs.C {
+					return fmt.Errorf("handshake mismatch remote %v > local %v", rhs.C, lhs.C)
+				}
+				lhs.C += rhs.C
+				return peer.Send(lhs)
+
+			case *kill:
+				// demonstrates use of peerPool, killing another peer connection as a response to a message
+				id := msg.C
+				pp.Get(id).Drop(errors.New("killed"))
+				return nil
+
+			case *drop:
+				// for testing we can trigger self induced disconnect upon receiving drop message
+				return errors.New("dropped")
+
+			default:
+				return fmt.Errorf("unknown message type: %T", msg)
+			}
+		}
+
+		pp.Add(peer)
+		defer pp.Remove(peer)
+		return peer.Run(handle)
+	}
+}
+
+func protocolTester(t *testing.T, pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester {
+	conf := adapters.RandomNodeConfig()
+	return p2ptest.NewProtocolTester(t, conf.ID, 2, newProtocol(pp))
+}
+
+func protoHandshakeExchange(id discover.NodeID, proto *protoHandshake) []p2ptest.Exchange {
+
+	return []p2ptest.Exchange{
+		{
+			Expects: []p2ptest.Expect{
+				{
+					Code: 0,
+					Msg:  &protoHandshake{42, "420"},
+					Peer: id,
+				},
+			},
+		},
+		{
+			Triggers: []p2ptest.Trigger{
+				{
+					Code: 0,
+					Msg:  proto,
+					Peer: id,
+				},
+			},
+		},
+	}
+}
+
+func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
+	pp := p2ptest.NewTestPeerPool()
+	s := protocolTester(t, pp)
+	// TODO: make this more than one handshake
+	id := s.IDs[0]
+	if err := s.TestExchanges(protoHandshakeExchange(id, proto)...); err != nil {
+		t.Fatal(err)
+	}
+	var disconnects []*p2ptest.Disconnect
+	for i, err := range errs {
+		disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err})
+	}
+	if err := s.TestDisconnected(disconnects...); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestProtoHandshakeVersionMismatch(t *testing.T) {
+	runProtoHandshake(t, &protoHandshake{41, "420"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 41 (!= 42)").Error()))
+}
+
+func TestProtoHandshakeNetworkIDMismatch(t *testing.T) {
+	runProtoHandshake(t, &protoHandshake{42, "421"}, errorf(ErrHandshake, errorf(ErrHandler, "(msg code 0): 421 (!= 420)").Error()))
+}
+
+func TestProtoHandshakeSuccess(t *testing.T) {
+	runProtoHandshake(t, &protoHandshake{42, "420"})
+}
+
+func moduleHandshakeExchange(id discover.NodeID, resp uint) []p2ptest.Exchange {
+
+	return []p2ptest.Exchange{
+		{
+			Expects: []p2ptest.Expect{
+				{
+					Code: 1,
+					Msg:  &hs0{42},
+					Peer: id,
+				},
+			},
+		},
+		{
+			Triggers: []p2ptest.Trigger{
+				{
+					Code: 1,
+					Msg:  &hs0{resp},
+					Peer: id,
+				},
+			},
+		},
+	}
+}
+
+func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
+	pp := p2ptest.NewTestPeerPool()
+	s := protocolTester(t, pp)
+	id := s.IDs[0]
+	if err := s.TestExchanges(protoHandshakeExchange(id, &protoHandshake{42, "420"})...); err != nil {
+		t.Fatal(err)
+	}
+	if err := s.TestExchanges(moduleHandshakeExchange(id, resp)...); err != nil {
+		t.Fatal(err)
+	}
+	var disconnects []*p2ptest.Disconnect
+	for i, err := range errs {
+		disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err})
+	}
+	if err := s.TestDisconnected(disconnects...); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestModuleHandshakeError(t *testing.T) {
+	runModuleHandshake(t, 43, fmt.Errorf("handshake mismatch remote 43 > local 42"))
+}
+
+func TestModuleHandshakeSuccess(t *testing.T) {
+	runModuleHandshake(t, 42)
+}
+
+// testing complex interactions over multiple peers, relaying, dropping
+func testMultiPeerSetup(a, b discover.NodeID) []p2ptest.Exchange {
+
+	return []p2ptest.Exchange{
+		{
+			Label: "primary handshake",
+			Expects: []p2ptest.Expect{
+				{
+					Code: 0,
+					Msg:  &protoHandshake{42, "420"},
+					Peer: a,
+				},
+				{
+					Code: 0,
+					Msg:  &protoHandshake{42, "420"},
+					Peer: b,
+				},
+			},
+		},
+		{
+			Label: "module handshake",
+			Triggers: []p2ptest.Trigger{
+				{
+					Code: 0,
+					Msg:  &protoHandshake{42, "420"},
+					Peer: a,
+				},
+				{
+					Code: 0,
+					Msg:  &protoHandshake{42, "420"},
+					Peer: b,
+				},
+			},
+			Expects: []p2ptest.Expect{
+				{
+					Code: 1,
+					Msg:  &hs0{42},
+					Peer: a,
+				},
+				{
+					Code: 1,
+					Msg:  &hs0{42},
+					Peer: b,
+				},
+			},
+		},
+
+		{Label: "alternative module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{41}, Peer: a},
+			{Code: 1, Msg: &hs0{41}, Peer: b}}},
+		{Label: "repeated module handshake", Triggers: []p2ptest.Trigger{{Code: 1, Msg: &hs0{1}, Peer: a}}},
+		{Label: "receiving repeated module handshake", Expects: []p2ptest.Expect{{Code: 1, Msg: &hs0{43}, Peer: a}}}}
+}
+
+func runMultiplePeers(t *testing.T, peer int, errs ...error) {
+	pp := p2ptest.NewTestPeerPool()
+	s := protocolTester(t, pp)
+
+	if err := s.TestExchanges(testMultiPeerSetup(s.IDs[0], s.IDs[1])...); err != nil {
+		t.Fatal(err)
+	}
+	// after some exchanges of messages, we can test state changes
+	// here this is simply demonstrated by the peerPool
+	// after the handshake negotiations peers must be added to the pool
+	// time.Sleep(1)
+	tick := time.NewTicker(10 * time.Millisecond)
+	timeout := time.NewTimer(1 * time.Second)
+WAIT:
+	for {
+		select {
+		case <-tick.C:
+			if pp.Has(s.IDs[0]) {
+				break WAIT
+			}
+		case <-timeout.C:
+			t.Fatal("timeout")
+		}
+	}
+	if !pp.Has(s.IDs[1]) {
+		t.Fatalf("missing peer test-1: %v (%v)", pp, s.IDs)
+	}
+
+	// peer 0 sends kill request for peer with index <peer>
+	err := s.TestExchanges(p2ptest.Exchange{
+		Triggers: []p2ptest.Trigger{
+			{
+				Code: 2,
+				Msg:  &kill{s.IDs[peer]},
+				Peer: s.IDs[0],
+			},
+		},
+	})
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// the peer not killed sends a drop request
+	err = s.TestExchanges(p2ptest.Exchange{
+		Triggers: []p2ptest.Trigger{
+			{
+				Code: 3,
+				Msg:  &drop{},
+				Peer: s.IDs[(peer+1)%2],
+			},
+		},
+	})
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// check the actual discconnect errors on the individual peers
+	var disconnects []*p2ptest.Disconnect
+	for i, err := range errs {
+		disconnects = append(disconnects, &p2ptest.Disconnect{Peer: s.IDs[i], Error: err})
+	}
+	if err := s.TestDisconnected(disconnects...); err != nil {
+		t.Fatal(err)
+	}
+	// test if disconnected peers have been removed from peerPool
+	if pp.Has(s.IDs[peer]) {
+		t.Fatalf("peer test-%v not dropped: %v (%v)", peer, pp, s.IDs)
+	}
+
+}
+
+func TestMultiplePeersDropSelf(t *testing.T) {
+	runMultiplePeers(t, 0,
+		fmt.Errorf("subprotocol error"),
+		fmt.Errorf("Message handler error: (msg code 3): dropped"),
+	)
+}
+
+func TestMultiplePeersDropOther(t *testing.T) {
+	runMultiplePeers(t, 1,
+		fmt.Errorf("Message handler error: (msg code 3): dropped"),
+		fmt.Errorf("subprotocol error"),
+	)
+}

+ 67 - 0
p2p/testing/peerpool.go

@@ -0,0 +1,67 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package testing
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+type TestPeer interface {
+	ID() discover.NodeID
+	Drop(error)
+}
+
+// TestPeerPool is an example peerPool to demonstrate registration of peer connections
+type TestPeerPool struct {
+	lock  sync.Mutex
+	peers map[discover.NodeID]TestPeer
+}
+
+func NewTestPeerPool() *TestPeerPool {
+	return &TestPeerPool{peers: make(map[discover.NodeID]TestPeer)}
+}
+
+func (self *TestPeerPool) Add(p TestPeer) {
+	self.lock.Lock()
+	defer self.lock.Unlock()
+	log.Trace(fmt.Sprintf("pp add peer  %v", p.ID()))
+	self.peers[p.ID()] = p
+
+}
+
+func (self *TestPeerPool) Remove(p TestPeer) {
+	self.lock.Lock()
+	defer self.lock.Unlock()
+	delete(self.peers, p.ID())
+}
+
+func (self *TestPeerPool) Has(id discover.NodeID) bool {
+	self.lock.Lock()
+	defer self.lock.Unlock()
+	_, ok := self.peers[id]
+	return ok
+}
+
+func (self *TestPeerPool) Get(id discover.NodeID) TestPeer {
+	self.lock.Lock()
+	defer self.lock.Unlock()
+	return self.peers[id]
+}

+ 280 - 0
p2p/testing/protocolsession.go

@@ -0,0 +1,280 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package testing
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/p2p/discover"
+	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+)
+
+var errTimedOut = errors.New("timed out")
+
+// ProtocolSession is a quasi simulation of a pivot node running
+// a service and a number of dummy peers that can send (trigger) or
+// receive (expect) messages
+type ProtocolSession struct {
+	Server  *p2p.Server
+	IDs     []discover.NodeID
+	adapter *adapters.SimAdapter
+	events  chan *p2p.PeerEvent
+}
+
+// Exchange is the basic units of protocol tests
+// the triggers and expects in the arrays are run immediately and asynchronously
+// thus one cannot have multiple expects for the SAME peer with DIFFERENT message types
+// because it's unpredictable which expect will receive which message
+// (with expect #1 and #2, messages might be sent #2 and #1, and both expects will complain about wrong message code)
+// an exchange is defined on a session
+type Exchange struct {
+	Label    string
+	Triggers []Trigger
+	Expects  []Expect
+	Timeout  time.Duration
+}
+
+// Trigger is part of the exchange, incoming message for the pivot node
+// sent by a peer
+type Trigger struct {
+	Msg     interface{}     // type of message to be sent
+	Code    uint64          // code of message is given
+	Peer    discover.NodeID // the peer to send the message to
+	Timeout time.Duration   // timeout duration for the sending
+}
+
+// Expect is part of an exchange, outgoing message from the pivot node
+// received by a peer
+type Expect struct {
+	Msg     interface{}     // type of message to expect
+	Code    uint64          // code of message is now given
+	Peer    discover.NodeID // the peer that expects the message
+	Timeout time.Duration   // timeout duration for receiving
+}
+
+// Disconnect represents a disconnect event, used and checked by TestDisconnected
+type Disconnect struct {
+	Peer  discover.NodeID // discconnected peer
+	Error error           // disconnect reason
+}
+
+// trigger sends messages from peers
+func (self *ProtocolSession) trigger(trig Trigger) error {
+	simNode, ok := self.adapter.GetNode(trig.Peer)
+	if !ok {
+		return fmt.Errorf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.IDs))
+	}
+	mockNode, ok := simNode.Services()[0].(*mockNode)
+	if !ok {
+		return fmt.Errorf("trigger: peer %v is not a mock", trig.Peer)
+	}
+
+	errc := make(chan error)
+
+	go func() {
+		errc <- mockNode.Trigger(&trig)
+	}()
+
+	t := trig.Timeout
+	if t == time.Duration(0) {
+		t = 1000 * time.Millisecond
+	}
+	select {
+	case err := <-errc:
+		return err
+	case <-time.After(t):
+		return fmt.Errorf("timout expecting %v to send to peer %v", trig.Msg, trig.Peer)
+	}
+}
+
+// expect checks an expectation of a message sent out by the pivot node
+func (self *ProtocolSession) expect(exps []Expect) error {
+	// construct a map of expectations for each node
+	peerExpects := make(map[discover.NodeID][]Expect)
+	for _, exp := range exps {
+		if exp.Msg == nil {
+			return errors.New("no message to expect")
+		}
+		peerExpects[exp.Peer] = append(peerExpects[exp.Peer], exp)
+	}
+
+	// construct a map of mockNodes for each node
+	mockNodes := make(map[discover.NodeID]*mockNode)
+	for nodeID := range peerExpects {
+		simNode, ok := self.adapter.GetNode(nodeID)
+		if !ok {
+			return fmt.Errorf("trigger: peer %v does not exist (1- %v)", nodeID, len(self.IDs))
+		}
+		mockNode, ok := simNode.Services()[0].(*mockNode)
+		if !ok {
+			return fmt.Errorf("trigger: peer %v is not a mock", nodeID)
+		}
+		mockNodes[nodeID] = mockNode
+	}
+
+	// done chanell cancels all created goroutines when function returns
+	done := make(chan struct{})
+	defer close(done)
+	// errc catches the first error from
+	errc := make(chan error)
+
+	wg := &sync.WaitGroup{}
+	wg.Add(len(mockNodes))
+	for nodeID, mockNode := range mockNodes {
+		nodeID := nodeID
+		mockNode := mockNode
+		go func() {
+			defer wg.Done()
+
+			// Sum all Expect timeouts to give the maximum
+			// time for all expectations to finish.
+			// mockNode.Expect checks all received messages against
+			// a list of expected messages and timeout for each
+			// of them can not be checked separately.
+			var t time.Duration
+			for _, exp := range peerExpects[nodeID] {
+				if exp.Timeout == time.Duration(0) {
+					t += 2000 * time.Millisecond
+				} else {
+					t += exp.Timeout
+				}
+			}
+			alarm := time.NewTimer(t)
+			defer alarm.Stop()
+
+			// expectErrc is used to check if error returned
+			// from mockNode.Expect is not nil and to send it to
+			// errc only in that case.
+			// done channel will be closed when function
+			expectErrc := make(chan error)
+			go func() {
+				select {
+				case expectErrc <- mockNode.Expect(peerExpects[nodeID]...):
+				case <-done:
+				case <-alarm.C:
+				}
+			}()
+
+			select {
+			case err := <-expectErrc:
+				if err != nil {
+					select {
+					case errc <- err:
+					case <-done:
+					case <-alarm.C:
+						errc <- errTimedOut
+					}
+				}
+			case <-done:
+			case <-alarm.C:
+				errc <- errTimedOut
+			}
+
+		}()
+	}
+
+	go func() {
+		wg.Wait()
+		// close errc when all goroutines finish to return nill err from errc
+		close(errc)
+	}()
+
+	return <-errc
+}
+
+// TestExchanges tests a series of exchanges against the session
+func (self *ProtocolSession) TestExchanges(exchanges ...Exchange) error {
+	for i, e := range exchanges {
+		if err := self.testExchange(e); err != nil {
+			return fmt.Errorf("exchange #%d %q: %v", i, e.Label, err)
+		}
+		log.Trace(fmt.Sprintf("exchange #%d %q: run successfully", i, e.Label))
+	}
+	return nil
+}
+
+// testExchange tests a single Exchange.
+// Default timeout value is 2 seconds.
+func (self *ProtocolSession) testExchange(e Exchange) error {
+	errc := make(chan error)
+	done := make(chan struct{})
+	defer close(done)
+
+	go func() {
+		for _, trig := range e.Triggers {
+			err := self.trigger(trig)
+			if err != nil {
+				errc <- err
+				return
+			}
+		}
+
+		select {
+		case errc <- self.expect(e.Expects):
+		case <-done:
+		}
+	}()
+
+	// time out globally or finish when all expectations satisfied
+	t := e.Timeout
+	if t == 0 {
+		t = 2000 * time.Millisecond
+	}
+	alarm := time.NewTimer(t)
+	select {
+	case err := <-errc:
+		return err
+	case <-alarm.C:
+		return errTimedOut
+	}
+}
+
+// TestDisconnected tests the disconnections given as arguments
+// the disconnect structs describe what disconnect error is expected on which peer
+func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error {
+	expects := make(map[discover.NodeID]error)
+	for _, disconnect := range disconnects {
+		expects[disconnect.Peer] = disconnect.Error
+	}
+
+	timeout := time.After(time.Second)
+	for len(expects) > 0 {
+		select {
+		case event := <-self.events:
+			if event.Type != p2p.PeerEventTypeDrop {
+				continue
+			}
+			expectErr, ok := expects[event.Peer]
+			if !ok {
+				continue
+			}
+
+			if !(expectErr == nil && event.Error == "" || expectErr != nil && expectErr.Error() == event.Error) {
+				return fmt.Errorf("unexpected error on peer %v. expected '%v', got '%v'", event.Peer, expectErr, event.Error)
+			}
+			delete(expects, event.Peer)
+		case <-timeout:
+			return fmt.Errorf("timed out waiting for peers to disconnect")
+		}
+	}
+	return nil
+}

+ 269 - 0
p2p/testing/protocoltester.go

@@ -0,0 +1,269 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+/*
+the p2p/testing package provides a unit test scheme to check simple
+protocol message exchanges with one pivot node and a number of dummy peers
+The pivot test node runs a node.Service, the dummy peers run a mock node
+that can be used to send and receive messages
+*/
+
+package testing
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"strings"
+	"sync"
+	"testing"
+
+	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/node"
+	"github.com/ethereum/go-ethereum/p2p"
+	"github.com/ethereum/go-ethereum/p2p/discover"
+	"github.com/ethereum/go-ethereum/p2p/simulations"
+	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+	"github.com/ethereum/go-ethereum/rlp"
+	"github.com/ethereum/go-ethereum/rpc"
+)
+
+// ProtocolTester is the tester environment used for unit testing protocol
+// message exchanges. It uses p2p/simulations framework
+type ProtocolTester struct {
+	*ProtocolSession
+	network *simulations.Network
+}
+
+// NewProtocolTester constructs a new ProtocolTester
+// it takes as argument the pivot node id, the number of dummy peers and the
+// protocol run function called on a peer connection by the p2p server
+func NewProtocolTester(t *testing.T, id discover.NodeID, n int, run func(*p2p.Peer, p2p.MsgReadWriter) error) *ProtocolTester {
+	services := adapters.Services{
+		"test": func(ctx *adapters.ServiceContext) (node.Service, error) {
+			return &testNode{run}, nil
+		},
+		"mock": func(ctx *adapters.ServiceContext) (node.Service, error) {
+			return newMockNode(), nil
+		},
+	}
+	adapter := adapters.NewSimAdapter(services)
+	net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{})
+	if _, err := net.NewNodeWithConfig(&adapters.NodeConfig{
+		ID:              id,
+		EnableMsgEvents: true,
+		Services:        []string{"test"},
+	}); err != nil {
+		panic(err.Error())
+	}
+	if err := net.Start(id); err != nil {
+		panic(err.Error())
+	}
+
+	node := net.GetNode(id).Node.(*adapters.SimNode)
+	peers := make([]*adapters.NodeConfig, n)
+	peerIDs := make([]discover.NodeID, n)
+	for i := 0; i < n; i++ {
+		peers[i] = adapters.RandomNodeConfig()
+		peers[i].Services = []string{"mock"}
+		peerIDs[i] = peers[i].ID
+	}
+	events := make(chan *p2p.PeerEvent, 1000)
+	node.SubscribeEvents(events)
+	ps := &ProtocolSession{
+		Server:  node.Server(),
+		IDs:     peerIDs,
+		adapter: adapter,
+		events:  events,
+	}
+	self := &ProtocolTester{
+		ProtocolSession: ps,
+		network:         net,
+	}
+
+	self.Connect(id, peers...)
+
+	return self
+}
+
+// Stop stops the p2p server
+func (self *ProtocolTester) Stop() error {
+	self.Server.Stop()
+	return nil
+}
+
+// Connect brings up the remote peer node and connects it using the
+// p2p/simulations network connection with the in memory network adapter
+func (self *ProtocolTester) Connect(selfID discover.NodeID, peers ...*adapters.NodeConfig) {
+	for _, peer := range peers {
+		log.Trace(fmt.Sprintf("start node %v", peer.ID))
+		if _, err := self.network.NewNodeWithConfig(peer); err != nil {
+			panic(fmt.Sprintf("error starting peer %v: %v", peer.ID, err))
+		}
+		if err := self.network.Start(peer.ID); err != nil {
+			panic(fmt.Sprintf("error starting peer %v: %v", peer.ID, err))
+		}
+		log.Trace(fmt.Sprintf("connect to %v", peer.ID))
+		if err := self.network.Connect(selfID, peer.ID); err != nil {
+			panic(fmt.Sprintf("error connecting to peer %v: %v", peer.ID, err))
+		}
+	}
+
+}
+
+// testNode wraps a protocol run function and implements the node.Service
+// interface
+type testNode struct {
+	run func(*p2p.Peer, p2p.MsgReadWriter) error
+}
+
+func (t *testNode) Protocols() []p2p.Protocol {
+	return []p2p.Protocol{{
+		Length: 100,
+		Run:    t.run,
+	}}
+}
+
+func (t *testNode) APIs() []rpc.API {
+	return nil
+}
+
+func (t *testNode) Start(server *p2p.Server) error {
+	return nil
+}
+
+func (t *testNode) Stop() error {
+	return nil
+}
+
+// mockNode is a testNode which doesn't actually run a protocol, instead
+// exposing channels so that tests can manually trigger and expect certain
+// messages
+type mockNode struct {
+	testNode
+
+	trigger  chan *Trigger
+	expect   chan []Expect
+	err      chan error
+	stop     chan struct{}
+	stopOnce sync.Once
+}
+
+func newMockNode() *mockNode {
+	mock := &mockNode{
+		trigger: make(chan *Trigger),
+		expect:  make(chan []Expect),
+		err:     make(chan error),
+		stop:    make(chan struct{}),
+	}
+	mock.testNode.run = mock.Run
+	return mock
+}
+
+// Run is a protocol run function which just loops waiting for tests to
+// instruct it to either trigger or expect a message from the peer
+func (m *mockNode) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+	for {
+		select {
+		case trig := <-m.trigger:
+			m.err <- p2p.Send(rw, trig.Code, trig.Msg)
+		case exps := <-m.expect:
+			m.err <- expectMsgs(rw, exps)
+		case <-m.stop:
+			return nil
+		}
+	}
+}
+
+func (m *mockNode) Trigger(trig *Trigger) error {
+	m.trigger <- trig
+	return <-m.err
+}
+
+func (m *mockNode) Expect(exp ...Expect) error {
+	m.expect <- exp
+	return <-m.err
+}
+
+func (m *mockNode) Stop() error {
+	m.stopOnce.Do(func() { close(m.stop) })
+	return nil
+}
+
+func expectMsgs(rw p2p.MsgReadWriter, exps []Expect) error {
+	matched := make([]bool, len(exps))
+	for {
+		msg, err := rw.ReadMsg()
+		if err != nil {
+			if err == io.EOF {
+				break
+			}
+			return err
+		}
+		actualContent, err := ioutil.ReadAll(msg.Payload)
+		if err != nil {
+			return err
+		}
+		var found bool
+		for i, exp := range exps {
+			if exp.Code == msg.Code && bytes.Equal(actualContent, mustEncodeMsg(exp.Msg)) {
+				if matched[i] {
+					return fmt.Errorf("message #%d received two times", i)
+				}
+				matched[i] = true
+				found = true
+				break
+			}
+		}
+		if !found {
+			expected := make([]string, 0)
+			for i, exp := range exps {
+				if matched[i] {
+					continue
+				}
+				expected = append(expected, fmt.Sprintf("code %d payload %x", exp.Code, mustEncodeMsg(exp.Msg)))
+			}
+			return fmt.Errorf("unexpected message code %d payload %x, expected %s", msg.Code, actualContent, strings.Join(expected, " or "))
+		}
+		done := true
+		for _, m := range matched {
+			if !m {
+				done = false
+				break
+			}
+		}
+		if done {
+			return nil
+		}
+	}
+	for i, m := range matched {
+		if !m {
+			return fmt.Errorf("expected message #%d not received", i)
+		}
+	}
+	return nil
+}
+
+// mustEncodeMsg uses rlp to encode a message.
+// In case of error it panics.
+func mustEncodeMsg(msg interface{}) []byte {
+	contentEnc, err := rlp.EncodeToBytes(msg)
+	if err != nil {
+		panic("content encode error: " + err.Error())
+	}
+	return contentEnc
+}