Jelajahi Sumber

transport加入

skyfffire 2 tahun lalu
induk
melakukan
693e48738c
6 mengubah file dengan 1017 tambahan dan 7 penghapusan
  1. 9 0
      go.mod
  2. 25 0
      go.sum
  3. 94 0
      p2p/metrics.go
  4. 690 0
      p2p/rlpx/rlpx.go
  5. 7 7
      p2p/server.go
  6. 192 0
      p2p/transport.go

+ 9 - 0
go.mod

@@ -3,18 +3,27 @@ module blockchain-go
 go 1.18
 
 require (
+	github.com/VictoriaMetrics/fastcache v1.6.0
 	github.com/ethereum/go-ethereum v1.11.5
 	github.com/go-stack/stack v1.8.1
+	github.com/golang/snappy v0.0.4
 	github.com/huin/goupnp v1.0.3
 	github.com/jackpal/go-nat-pmp v1.0.2
+	github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
 	github.com/panjf2000/ants/v2 v2.7.2
 	golang.org/x/crypto v0.7.0
 )
 
 require (
+	github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
 	github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
+	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
+	github.com/go-ole/go-ole v1.2.1 // indirect
 	github.com/holiman/uint256 v1.2.0 // indirect
+	github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
+	github.com/tklauser/go-sysconf v0.3.5 // indirect
+	github.com/tklauser/numcpus v0.2.2 // indirect
 	golang.org/x/sync v0.1.0 // indirect
 	golang.org/x/sys v0.6.0 // indirect
 )

+ 25 - 0
go.sum

@@ -1,6 +1,15 @@
+github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
+github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
+github.com/VictoriaMetrics/fastcache v1.6.0 h1:C/3Oi3EiBCqufydp1neRZkqcwmEiuRT9c3fqvvgKm5o=
+github.com/VictoriaMetrics/fastcache v1.6.0/go.mod h1:0qHz5QP0GMX4pfmMA/zt5RgfNuXJrTP0zS7DqpHGGTw=
+github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
+github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
 github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
 github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
 github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -10,8 +19,13 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1
 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
 github.com/ethereum/go-ethereum v1.11.5 h1:3M1uan+LAUvdn+7wCEFrcMM4LJTeuxDrPTg/f31a5QQ=
 github.com/ethereum/go-ethereum v1.11.5/go.mod h1:it7x0DWnTDMfVFdXcU6Ti4KEFQynLHVRarcSlPr0HBo=
+github.com/go-ole/go-ole v1.2.1 h1:2lOsA72HgjxAuMlKpFiCbHTvu44PIVkZ5hqm3RSdI/E=
+github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
 github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
 github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
+github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM=
 github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
 github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ=
@@ -19,22 +33,33 @@ github.com/huin/goupnp v1.0.3/go.mod h1:ZxNlw5WqJj6wSsRK5+YfflQGXYfccj5VgQsMNixH
 github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
 github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
 github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
+github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
+github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
 github.com/panjf2000/ants/v2 v2.7.2 h1:2NUt9BaZFO5kQzrieOmK/wdb/tQ/K+QHaxN8sOgD63U=
 github.com/panjf2000/ants/v2 v2.7.2/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU=
+github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
+github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
+github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
+github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM=
 golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
 golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
 golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

+ 94 - 0
p2p/metrics.go

@@ -0,0 +1,94 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the meters and timers used by the networking layer.
+
+package p2p
+
+import (
+	"net"
+
+	"github.com/ethereum/go-ethereum/metrics"
+)
+
+const (
+	// ingressMeterName is the prefix of the per-packet inbound metrics.
+	ingressMeterName = "p2p/ingress"
+
+	// egressMeterName is the prefix of the per-packet outbound metrics.
+	egressMeterName = "p2p/egress"
+
+	// HandleHistName is the prefix of the per-packet serving time histograms.
+	HandleHistName = "p2p/handle"
+)
+
+var (
+	ingressConnectMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
+	ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil)
+	egressConnectMeter  = metrics.NewRegisteredMeter("p2p/dials", nil)
+	egressTrafficMeter  = metrics.NewRegisteredMeter(egressMeterName, nil)
+	activePeerGauge     = metrics.NewRegisteredGauge("p2p/peers", nil)
+)
+
+// meteredConn is a wrapper around a net.Conn that meters both the
+// inbound and outbound network traffic.
+type meteredConn struct {
+	net.Conn
+}
+
+// newMeteredConn creates a new metered connection, bumps the ingress or egress
+// connection meter and also increases the metered peer count. If the metrics
+// system is disabled, function returns the original connection.
+func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
+	// Short circuit if metrics are disabled
+	if !metrics.Enabled {
+		return conn
+	}
+	// Bump the connection counters and wrap the connection
+	if ingress {
+		ingressConnectMeter.Mark(1)
+	} else {
+		egressConnectMeter.Mark(1)
+	}
+	activePeerGauge.Inc(1)
+	return &meteredConn{Conn: conn}
+}
+
+// Read delegates a network read to the underlying connection, bumping the common
+// and the peer ingress traffic meters along the way.
+func (c *meteredConn) Read(b []byte) (n int, err error) {
+	n, err = c.Conn.Read(b)
+	ingressTrafficMeter.Mark(int64(n))
+	return n, err
+}
+
+// Write delegates a network write to the underlying connection, bumping the common
+// and the peer egress traffic meters along the way.
+func (c *meteredConn) Write(b []byte) (n int, err error) {
+	n, err = c.Conn.Write(b)
+	egressTrafficMeter.Mark(int64(n))
+	return n, err
+}
+
+// Close delegates a close operation to the underlying connection, unregisters
+// the peer from the traffic registries and emits close event.
+func (c *meteredConn) Close() error {
+	err := c.Conn.Close()
+	if err == nil {
+		activePeerGauge.Dec(1)
+	}
+	return err
+}

+ 690 - 0
p2p/rlpx/rlpx.go

@@ -0,0 +1,690 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package rlpx implements the RLPx transport protocol.
+package rlpx
+
+import (
+	"bytes"
+	"crypto/aes"
+	"crypto/cipher"
+	"crypto/ecdsa"
+	"crypto/elliptic"
+	"crypto/hmac"
+	"crypto/rand"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"hash"
+	"io"
+	mrand "math/rand"
+	"net"
+	"time"
+
+	"github.com/VictoriaMetrics/fastcache"
+	"github.com/golang/snappy"
+	"github.com/oxtoacart/bpool"
+	"golang.org/x/crypto/sha3"
+
+	"blockchain-go/rlp"
+	"github.com/ethereum/go-ethereum/crypto"
+	"github.com/ethereum/go-ethereum/crypto/ecies"
+)
+
+var snappyCache *fastcache.Cache
+
+func init() {
+	snappyCache = fastcache.New(50 * 1024 * 1024)
+}
+
+// Conn is an RLPx network connection. It wraps a low-level network connection. The
+// underlying connection should not be used for other activity when it is wrapped by Conn.
+//
+// Before sending messages, a handshake must be performed by calling the Handshake method.
+// This type is not generally safe for concurrent use, but reading and writing of messages
+// may happen concurrently after the handshake.
+type Conn struct {
+	dialDest  *ecdsa.PublicKey
+	conn      net.Conn
+	handshake *handshakeState
+	snappy    bool
+}
+
+type handshakeState struct {
+	enc cipher.Stream
+	dec cipher.Stream
+
+	macCipher  cipher.Block
+	egressMAC  hash.Hash
+	ingressMAC hash.Hash
+}
+
+// NewConn wraps the given network connection. If dialDest is non-nil, the connection
+// behaves as the initiator during the handshake.
+func NewConn(conn net.Conn, dialDest *ecdsa.PublicKey) *Conn {
+	return &Conn{
+		dialDest: dialDest,
+		conn:     conn,
+	}
+}
+
+// SetSnappy enables or disables snappy compression of messages. This is usually called
+// after the devp2p Hello message exchange when the negotiated version indicates that
+// compression is available on both ends of the connection.
+func (c *Conn) SetSnappy(snappy bool) {
+	c.snappy = snappy
+}
+
+// SetReadDeadline sets the deadline for all future read operations.
+func (c *Conn) SetReadDeadline(time time.Time) error {
+	return c.conn.SetReadDeadline(time)
+}
+
+// SetWriteDeadline sets the deadline for all future write operations.
+func (c *Conn) SetWriteDeadline(time time.Time) error {
+	return c.conn.SetWriteDeadline(time)
+}
+
+// SetDeadline sets the deadline for all future read and write operations.
+func (c *Conn) SetDeadline(time time.Time) error {
+	return c.conn.SetDeadline(time)
+}
+
+// Read reads a message from the connection.
+func (c *Conn) Read() (code uint64, data []byte, wireSize int, err error) {
+	if c.handshake == nil {
+		panic("can't ReadMsg before handshake")
+	}
+
+	frame, err := c.handshake.readFrame(c.conn)
+	if err != nil {
+		return 0, nil, 0, err
+	}
+	code, data, err = rlp.SplitUint64(frame)
+	if err != nil {
+		return 0, nil, 0, fmt.Errorf("invalid message code: %v", err)
+	}
+	wireSize = len(data)
+
+	// If snappy is enabled, verify and decompress message.
+	if c.snappy {
+		var actualSize int
+		actualSize, err = snappy.DecodedLen(data)
+		if err != nil {
+			return code, nil, 0, err
+		}
+		if actualSize > maxUint24 {
+			return code, nil, 0, errPlainMessageTooLarge
+		}
+		data, err = snappy.Decode(nil, data)
+	}
+	return code, data, wireSize, err
+}
+
+func (h *handshakeState) readFrame(conn io.Reader) ([]byte, error) {
+	// read the header
+	headbuf := make([]byte, 32)
+	if _, err := io.ReadFull(conn, headbuf); err != nil {
+		return nil, err
+	}
+
+	// verify header mac
+	shouldMAC := updateMAC(h.ingressMAC, h.macCipher, headbuf[:16])
+	if !hmac.Equal(shouldMAC, headbuf[16:]) {
+		return nil, errors.New("bad header MAC")
+	}
+	h.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted
+	fsize := readInt24(headbuf)
+	// ignore protocol type for now
+
+	// read the frame content
+	var rsize = fsize // frame size rounded up to 16 byte boundary
+	if padding := fsize % 16; padding > 0 {
+		rsize += 16 - padding
+	}
+	framebuf := make([]byte, rsize)
+	if _, err := io.ReadFull(conn, framebuf); err != nil {
+		return nil, err
+	}
+
+	// read and validate frame MAC. we can re-use headbuf for that.
+	h.ingressMAC.Write(framebuf)
+	fmacseed := h.ingressMAC.Sum(nil)
+	if _, err := io.ReadFull(conn, headbuf[:16]); err != nil {
+		return nil, err
+	}
+	shouldMAC = updateMAC(h.ingressMAC, h.macCipher, fmacseed)
+	if !hmac.Equal(shouldMAC, headbuf[:16]) {
+		return nil, errors.New("bad frame MAC")
+	}
+
+	// decrypt frame content
+	h.dec.XORKeyStream(framebuf, framebuf)
+	return framebuf[:fsize], nil
+}
+
+// Write writes a message to the connection.
+//
+// Write returns the written size of the message data. This may be less than or equal to
+// len(data) depending on whether snappy compression is enabled.
+func (c *Conn) Write(code uint64, data []byte) (uint32, error) {
+	if c.handshake == nil {
+		panic("can't WriteMsg before handshake")
+	}
+	if len(data) > maxUint24 {
+		return 0, errPlainMessageTooLarge
+	}
+	if c.snappy {
+		if encodedResult, ok := snappyCache.HasGet(nil, data); ok {
+			data = encodedResult
+		} else {
+			encodedData := snappy.Encode(nil, data)
+			snappyCache.Set(data, encodedData)
+
+			data = encodedData
+		}
+	}
+
+	wireSize := uint32(len(data))
+	err := c.handshake.writeFrame(c.conn, code, data)
+	return wireSize, err
+}
+
+func (h *handshakeState) writeFrame(conn io.Writer, code uint64, data []byte) error {
+	ptype, _ := rlp.EncodeToBytes(code)
+
+	// write header
+	headbuf := make([]byte, 32)
+	fsize := len(ptype) + len(data)
+	if fsize > maxUint24 {
+		return errPlainMessageTooLarge
+	}
+	putInt24(uint32(fsize), headbuf)
+	copy(headbuf[3:], zeroHeader)
+	h.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted
+
+	// write header MAC
+	copy(headbuf[16:], updateMAC(h.egressMAC, h.macCipher, headbuf[:16]))
+	if _, err := conn.Write(headbuf); err != nil {
+		return err
+	}
+
+	// write encrypted frame, updating the egress MAC hash with
+	// the data written to conn.
+	tee := cipher.StreamWriter{S: h.enc, W: io.MultiWriter(conn, h.egressMAC)}
+	if _, err := tee.Write(ptype); err != nil {
+		return err
+	}
+	if _, err := tee.Write(data); err != nil {
+		return err
+	}
+	if padding := fsize % 16; padding > 0 {
+		if _, err := tee.Write(zero16[:16-padding]); err != nil {
+			return err
+		}
+	}
+
+	// write frame MAC. egress MAC hash is up to date because
+	// frame content was written to it as well.
+	fmacseed := h.egressMAC.Sum(nil)
+	mac := updateMAC(h.egressMAC, h.macCipher, fmacseed)
+	_, err := conn.Write(mac)
+	return err
+}
+
+func readInt24(b []byte) uint32 {
+	return uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16
+}
+
+func putInt24(v uint32, b []byte) {
+	b[0] = byte(v >> 16)
+	b[1] = byte(v >> 8)
+	b[2] = byte(v)
+}
+
+const BpoolMaxSize = 4
+
+var bytepool = bpool.NewBytePool(BpoolMaxSize, aes.BlockSize)
+
+// updateMAC reseeds the given hash with encrypted seed.
+// it returns the first 16 bytes of the hash sum after seeding.
+func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte {
+	aesbuf := bytepool.Get()
+	block.Encrypt(aesbuf, mac.Sum(nil))
+	for i := range aesbuf {
+		aesbuf[i] ^= seed[i]
+	}
+	mac.Write(aesbuf)
+	bytepool.Put(aesbuf)
+	return mac.Sum(nil)[:16]
+}
+
+// Handshake performs the handshake. This must be called before any data is written
+// or read from the connection.
+func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
+	var (
+		sec Secrets
+		err error
+	)
+	if c.dialDest != nil {
+		sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest)
+	} else {
+		sec, err = receiverEncHandshake(c.conn, prv)
+	}
+	if err != nil {
+		return nil, err
+	}
+	c.InitWithSecrets(sec)
+	return sec.remote, err
+}
+
+// InitWithSecrets injects connection secrets as if a handshake had
+// been performed. This cannot be called after the handshake.
+func (c *Conn) InitWithSecrets(sec Secrets) {
+	if c.handshake != nil {
+		panic("can't handshake twice")
+	}
+	macc, err := aes.NewCipher(sec.MAC)
+	if err != nil {
+		panic("invalid MAC secret: " + err.Error())
+	}
+	encc, err := aes.NewCipher(sec.AES)
+	if err != nil {
+		panic("invalid AES secret: " + err.Error())
+	}
+	// we use an all-zeroes IV for AES because the key used
+	// for encryption is ephemeral.
+	iv := make([]byte, encc.BlockSize())
+	c.handshake = &handshakeState{
+		enc:        cipher.NewCTR(encc, iv),
+		dec:        cipher.NewCTR(encc, iv),
+		macCipher:  macc,
+		egressMAC:  sec.EgressMAC,
+		ingressMAC: sec.IngressMAC,
+	}
+}
+
+// Close closes the underlying network connection.
+func (c *Conn) Close() error {
+	return c.conn.Close()
+}
+
+// Constants for the handshake.
+const (
+	maxUint24 = int(^uint32(0) >> 8)
+
+	sskLen = 16                     // ecies.MaxSharedKeyLength(pubKey) / 2
+	sigLen = crypto.SignatureLength // elliptic S256
+	pubLen = 64                     // 512 bit pubkey in uncompressed representation without format byte
+	shaLen = 32                     // hash length (for nonce etc)
+
+	authMsgLen  = sigLen + shaLen + pubLen + shaLen + 1
+	authRespLen = pubLen + shaLen + 1
+
+	eciesOverhead = 65 /* pubkey */ + 16 /* IV */ + 32 /* MAC */
+
+	encAuthMsgLen  = authMsgLen + eciesOverhead  // size of encrypted pre-EIP-8 initiator handshake
+	encAuthRespLen = authRespLen + eciesOverhead // size of encrypted pre-EIP-8 handshake reply
+)
+
+var (
+	// this is used in place of actual frame header data.
+	// TODO: replace this when Msg contains the protocol type code.
+	zeroHeader = []byte{0xC2, 0x80, 0x80}
+	// sixteen zero bytes
+	zero16 = make([]byte, 16)
+
+	// errPlainMessageTooLarge is returned if a decompressed message length exceeds
+	// the allowed 24 bits (i.e. length >= 16MB).
+	errPlainMessageTooLarge = errors.New("message length >= 16MB")
+)
+
+// Secrets represents the connection secrets which are negotiated during the handshake.
+type Secrets struct {
+	AES, MAC              []byte
+	EgressMAC, IngressMAC hash.Hash
+	remote                *ecdsa.PublicKey
+}
+
+// encHandshake contains the state of the encryption handshake.
+type encHandshake struct {
+	initiator            bool
+	remote               *ecies.PublicKey  // remote-pubk
+	initNonce, respNonce []byte            // nonce
+	randomPrivKey        *ecies.PrivateKey // ecdhe-random
+	remoteRandomPub      *ecies.PublicKey  // ecdhe-random-pubk
+}
+
+// RLPx v4 handshake auth (defined in EIP-8).
+type authMsgV4 struct {
+	gotPlain bool // whether read packet had plain format.
+
+	Signature       [sigLen]byte
+	InitiatorPubkey [pubLen]byte
+	Nonce           [shaLen]byte
+	Version         uint
+
+	// Ignore additional fields (forward-compatibility)
+	Rest []rlp.RawValue `rlp:"tail"`
+}
+
+// RLPx v4 handshake response (defined in EIP-8).
+type authRespV4 struct {
+	RandomPubkey [pubLen]byte
+	Nonce        [shaLen]byte
+	Version      uint
+
+	// Ignore additional fields (forward-compatibility)
+	Rest []rlp.RawValue `rlp:"tail"`
+}
+
+// receiverEncHandshake negotiates a session token on conn.
+// it should be called on the listening side of the connection.
+//
+// prv is the local client's private key.
+func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
+	authMsg := new(authMsgV4)
+	authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
+	if err != nil {
+		return s, err
+	}
+	h := new(encHandshake)
+	if err := h.handleAuthMsg(authMsg, prv); err != nil {
+		return s, err
+	}
+
+	authRespMsg, err := h.makeAuthResp()
+	if err != nil {
+		return s, err
+	}
+	var authRespPacket []byte
+	if authMsg.gotPlain {
+		authRespPacket, err = authRespMsg.sealPlain(h)
+	} else {
+		authRespPacket, err = sealEIP8(authRespMsg, h)
+	}
+	if err != nil {
+		return s, err
+	}
+	if _, err = conn.Write(authRespPacket); err != nil {
+		return s, err
+	}
+	return h.secrets(authPacket, authRespPacket)
+}
+
+func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error {
+	// Import the remote identity.
+	rpub, err := importPublicKey(msg.InitiatorPubkey[:])
+	if err != nil {
+		return err
+	}
+	h.initNonce = msg.Nonce[:]
+	h.remote = rpub
+
+	// Generate random keypair for ECDH.
+	// If a private key is already set, use it instead of generating one (for testing).
+	if h.randomPrivKey == nil {
+		h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
+		if err != nil {
+			return err
+		}
+	}
+
+	// Check the signature.
+	token, err := h.staticSharedSecret(prv)
+	if err != nil {
+		return err
+	}
+	signedMsg := xor(token, h.initNonce)
+	remoteRandomPub, err := crypto.Ecrecover(signedMsg, msg.Signature[:])
+	if err != nil {
+		return err
+	}
+	h.remoteRandomPub, _ = importPublicKey(remoteRandomPub)
+	return nil
+}
+
+// secrets is called after the handshake is completed.
+// It extracts the connection secrets from the handshake values.
+func (h *encHandshake) secrets(auth, authResp []byte) (Secrets, error) {
+	ecdheSecret, err := h.randomPrivKey.GenerateShared(h.remoteRandomPub, sskLen, sskLen)
+	if err != nil {
+		return Secrets{}, err
+	}
+
+	// derive base secrets from ephemeral key agreement
+	sharedSecret := crypto.Keccak256(ecdheSecret, crypto.Keccak256(h.respNonce, h.initNonce))
+	aesSecret := crypto.Keccak256(ecdheSecret, sharedSecret)
+	s := Secrets{
+		remote: h.remote.ExportECDSA(),
+		AES:    aesSecret,
+		MAC:    crypto.Keccak256(ecdheSecret, aesSecret),
+	}
+
+	// setup sha3 instances for the MACs
+	mac1 := sha3.NewLegacyKeccak256()
+	mac1.Write(xor(s.MAC, h.respNonce))
+	mac1.Write(auth)
+	mac2 := sha3.NewLegacyKeccak256()
+	mac2.Write(xor(s.MAC, h.initNonce))
+	mac2.Write(authResp)
+	if h.initiator {
+		s.EgressMAC, s.IngressMAC = mac1, mac2
+	} else {
+		s.EgressMAC, s.IngressMAC = mac2, mac1
+	}
+
+	return s, nil
+}
+
+// staticSharedSecret returns the static shared secret, the result
+// of key agreement between the local and remote static node key.
+func (h *encHandshake) staticSharedSecret(prv *ecdsa.PrivateKey) ([]byte, error) {
+	return ecies.ImportECDSA(prv).GenerateShared(h.remote, sskLen, sskLen)
+}
+
+// initiatorEncHandshake negotiates a session token on conn.
+// it should be called on the dialing side of the connection.
+//
+// prv is the local client's private key.
+func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) {
+	h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
+	authMsg, err := h.makeAuthMsg(prv)
+	if err != nil {
+		return s, err
+	}
+	authPacket, err := sealEIP8(authMsg, h)
+	if err != nil {
+		return s, err
+	}
+
+	if _, err = conn.Write(authPacket); err != nil {
+		return s, err
+	}
+
+	authRespMsg := new(authRespV4)
+	authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
+	if err != nil {
+		return s, err
+	}
+	if err := h.handleAuthResp(authRespMsg); err != nil {
+		return s, err
+	}
+	return h.secrets(authPacket, authRespPacket)
+}
+
+// makeAuthMsg creates the initiator handshake message.
+func (h *encHandshake) makeAuthMsg(prv *ecdsa.PrivateKey) (*authMsgV4, error) {
+	// Generate random initiator nonce.
+	h.initNonce = make([]byte, shaLen)
+	_, err := rand.Read(h.initNonce)
+	if err != nil {
+		return nil, err
+	}
+	// Generate random keypair to for ECDH.
+	h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
+	if err != nil {
+		return nil, err
+	}
+
+	// Sign known message: static-shared-secret ^ nonce
+	token, err := h.staticSharedSecret(prv)
+	if err != nil {
+		return nil, err
+	}
+	signed := xor(token, h.initNonce)
+	signature, err := crypto.Sign(signed, h.randomPrivKey.ExportECDSA())
+	if err != nil {
+		return nil, err
+	}
+
+	msg := new(authMsgV4)
+	copy(msg.Signature[:], signature)
+	copy(msg.InitiatorPubkey[:], crypto.FromECDSAPub(&prv.PublicKey)[1:])
+	copy(msg.Nonce[:], h.initNonce)
+	msg.Version = 4
+	return msg, nil
+}
+
+func (h *encHandshake) handleAuthResp(msg *authRespV4) (err error) {
+	h.respNonce = msg.Nonce[:]
+	h.remoteRandomPub, err = importPublicKey(msg.RandomPubkey[:])
+	return err
+}
+
+func (h *encHandshake) makeAuthResp() (msg *authRespV4, err error) {
+	// Generate random nonce.
+	h.respNonce = make([]byte, shaLen)
+	if _, err = rand.Read(h.respNonce); err != nil {
+		return nil, err
+	}
+
+	msg = new(authRespV4)
+	copy(msg.Nonce[:], h.respNonce)
+	copy(msg.RandomPubkey[:], exportPubkey(&h.randomPrivKey.PublicKey))
+	msg.Version = 4
+	return msg, nil
+}
+
+func (msg *authMsgV4) decodePlain(input []byte) {
+	n := copy(msg.Signature[:], input)
+	n += shaLen // skip sha3(initiator-ephemeral-pubk)
+	n += copy(msg.InitiatorPubkey[:], input[n:])
+	copy(msg.Nonce[:], input[n:])
+	msg.Version = 4
+	msg.gotPlain = true
+}
+
+func (msg *authRespV4) sealPlain(hs *encHandshake) ([]byte, error) {
+	buf := make([]byte, authRespLen)
+	n := copy(buf, msg.RandomPubkey[:])
+	copy(buf[n:], msg.Nonce[:])
+	return ecies.Encrypt(rand.Reader, hs.remote, buf, nil, nil)
+}
+
+func (msg *authRespV4) decodePlain(input []byte) {
+	n := copy(msg.RandomPubkey[:], input)
+	copy(msg.Nonce[:], input[n:])
+	msg.Version = 4
+}
+
+var padSpace = make([]byte, 300)
+
+func sealEIP8(msg interface{}, h *encHandshake) ([]byte, error) {
+	buf := new(bytes.Buffer)
+	if err := rlp.Encode(buf, msg); err != nil {
+		return nil, err
+	}
+	// pad with random amount of data. the amount needs to be at least 100 bytes to make
+	// the message distinguishable from pre-EIP-8 handshakes.
+	pad := padSpace[:mrand.Intn(len(padSpace)-100)+100]
+	buf.Write(pad)
+	prefix := make([]byte, 2)
+	binary.BigEndian.PutUint16(prefix, uint16(buf.Len()+eciesOverhead))
+
+	enc, err := ecies.Encrypt(rand.Reader, h.remote, buf.Bytes(), nil, prefix)
+	return append(prefix, enc...), err
+}
+
+type plainDecoder interface {
+	decodePlain([]byte)
+}
+
+func readHandshakeMsg(msg plainDecoder, plainSize int, prv *ecdsa.PrivateKey, r io.Reader) ([]byte, error) {
+	buf := make([]byte, plainSize)
+	if _, err := io.ReadFull(r, buf); err != nil {
+		return buf, err
+	}
+	// Attempt decoding pre-EIP-8 "plain" format.
+	key := ecies.ImportECDSA(prv)
+	if dec, err := key.Decrypt(buf, nil, nil); err == nil {
+		msg.decodePlain(dec)
+		return buf, nil
+	}
+	// Could be EIP-8 format, try that.
+	prefix := buf[:2]
+	size := binary.BigEndian.Uint16(prefix)
+	if size < uint16(plainSize) {
+		return buf, fmt.Errorf("size underflow, need at least %d bytes", plainSize)
+	}
+	buf = append(buf, make([]byte, size-uint16(plainSize)+2)...)
+	if _, err := io.ReadFull(r, buf[plainSize:]); err != nil {
+		return buf, err
+	}
+	dec, err := key.Decrypt(buf[2:], nil, prefix)
+	if err != nil {
+		return buf, err
+	}
+	// Can't use rlp.DecodeBytes here because it rejects
+	// trailing data (forward-compatibility).
+	s := rlp.NewStream(bytes.NewReader(dec), 0)
+	return buf, s.Decode(msg)
+}
+
+// importPublicKey unmarshals 512 bit public keys.
+func importPublicKey(pubKey []byte) (*ecies.PublicKey, error) {
+	var pubKey65 []byte
+	switch len(pubKey) {
+	case 64:
+		// add 'uncompressed key' flag
+		pubKey65 = append([]byte{0x04}, pubKey...)
+	case 65:
+		pubKey65 = pubKey
+	default:
+		return nil, fmt.Errorf("invalid public key length %v (expect 64/65)", len(pubKey))
+	}
+	// TODO: fewer pointless conversions
+	pub, err := crypto.UnmarshalPubkey(pubKey65)
+	if err != nil {
+		return nil, err
+	}
+	return ecies.ImportECDSAPublic(pub), nil
+}
+
+func exportPubkey(pub *ecies.PublicKey) []byte {
+	if pub == nil {
+		panic("nil pubkey")
+	}
+	return elliptic.Marshal(pub.Curve, pub.X, pub.Y)[1:]
+}
+
+func xor(one, other []byte) (xor []byte) {
+	xor = make([]byte, len(one))
+	for i := 0; i < len(one); i++ {
+		xor[i] = one[i] ^ other[i]
+	}
+	return xor
+}

+ 7 - 7
p2p/server.go

@@ -170,7 +170,7 @@ func (server *Server) listenLoop() {
 
 		// accept成功的处理
 		remoteIP := netutil.AddrIP(fd.RemoteAddr())
-		// 检查此IP是是否能加入本地节点的链接
+		// TODO 检查此IP是是否能加入本地节点的链接
 		//if err := server.checkInboundConn(remoteIP); err != nil {
 		//	srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
 		//	fd.Close()
@@ -178,12 +178,12 @@ func (server *Server) listenLoop() {
 		//	continue
 		//}
 		if remoteIP != nil {
-			//var addr *net.TCPAddr
-			//if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
-			//	addr = tcp
-			//}
-			//fd = newMeteredConn(fd, true, addr)
-			//srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
+			var addr *net.TCPAddr
+			if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
+				addr = tcp
+			}
+			fd = newMeteredConn(fd, true, addr)
+			fmt.Printf("Accepted connection, addr: %v.", fd.RemoteAddr())
 		}
 		gopool.Submit(func() {
 			//server.setupConn(fd, inboundConn, nil)

+ 192 - 0
p2p/transport.go

@@ -0,0 +1,192 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package p2p
+
+import (
+	"bytes"
+	"crypto/ecdsa"
+	"fmt"
+	"io"
+	"net"
+	"sync"
+	"time"
+
+	"blockchain-go/common/bitutil"
+	"blockchain-go/common/gopool"
+	"blockchain-go/p2p/rlpx"
+	"blockchain-go/rlp"
+	"github.com/ethereum/go-ethereum/metrics"
+)
+
+const (
+	// total timeout for encryption handshake and protocol
+	// handshake in both directions.
+	handshakeTimeout = 5 * time.Second
+
+	// This is the timeout for sending the disconnect reason.
+	// This is shorter than the usual timeout because we don't want
+	// to wait if the connection is known to be bad anyway.
+	discWriteTimeout = 1 * time.Second
+)
+
+type transport interface {
+	// The two handshakes.
+	doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error)
+	doProtoHandshake(our *protoHandshake) (*protoHandshake, error)
+	// The MsgReadWriter can only be used after the encryption
+	// handshake has completed. The code uses conn.id to track this
+	// by setting it to a non-nil value after the encryption handshake.
+	MsgReadWriter
+	// transports must provide Close because we use MsgPipe in some of
+	// the tests. Closing the actual network connection doesn't do
+	// anything in those tests because MsgPipe doesn't use it.
+	close(err error)
+}
+
+// rlpxTransport is the transport used by actual (non-test) connections.
+// It wraps an RLPx connection with locks and read/write deadlines.
+type rlpxTransport struct {
+	rmu, wmu sync.Mutex
+	wbuf     bytes.Buffer
+	conn     *rlpx.Conn
+}
+
+func newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
+	return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
+}
+
+func (t *rlpxTransport) ReadMsg() (Msg, error) {
+	t.rmu.Lock()
+	defer t.rmu.Unlock()
+
+	var msg Msg
+	t.conn.SetReadDeadline(time.Now().Add(frameReadTimeout))
+	code, data, wireSize, err := t.conn.Read()
+	if err == nil {
+		msg = Msg{
+			ReceivedAt: time.Now(),
+			Code:       code,
+			Size:       uint32(len(data)),
+			meterSize:  uint32(wireSize),
+			Payload:    bytes.NewReader(data),
+		}
+	}
+	return msg, err
+}
+
+func (t *rlpxTransport) WriteMsg(msg Msg) error {
+	t.wmu.Lock()
+	defer t.wmu.Unlock()
+
+	// Copy message data to write buffer.
+	t.wbuf.Reset()
+	if _, err := io.CopyN(&t.wbuf, msg.Payload, int64(msg.Size)); err != nil {
+		return err
+	}
+
+	// Write the message.
+	t.conn.SetWriteDeadline(time.Now().Add(frameWriteTimeout))
+	size, err := t.conn.Write(msg.Code, t.wbuf.Bytes())
+	if err != nil {
+		return err
+	}
+
+	// Set metrics.
+	msg.meterSize = size
+	if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
+		m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode)
+		metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
+		metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1)
+	}
+	return nil
+}
+
+func (t *rlpxTransport) close(err error) {
+	t.wmu.Lock()
+	defer t.wmu.Unlock()
+
+	// Tell the remote end why we're disconnecting if possible.
+	// We only bother doing this if the underlying connection supports
+	// setting a timeout tough.
+	if t.conn != nil {
+		if r, ok := err.(DiscReason); ok && r != DiscNetworkError {
+			deadline := time.Now().Add(discWriteTimeout)
+			if err := t.conn.SetWriteDeadline(deadline); err == nil {
+				// Connection supports write deadline.
+				t.wbuf.Reset()
+				rlp.Encode(&t.wbuf, []DiscReason{r})
+				t.conn.Write(discMsg, t.wbuf.Bytes())
+			}
+		}
+	}
+	t.conn.Close()
+}
+
+func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
+	t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
+	return t.conn.Handshake(prv)
+}
+
+func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
+	// Writing our handshake happens concurrently, we prefer
+	// returning the handshake read error. If the remote side
+	// disconnects us early with a valid reason, we should return it
+	// as the error so it can be tracked elsewhere.
+	werr := make(chan error, 1)
+	gopool.Submit(func() { werr <- Send(t, handshakeMsg, our) })
+	if their, err = readProtocolHandshake(t); err != nil {
+		<-werr // make sure the write terminates too
+		return nil, err
+	}
+	if err := <-werr; err != nil {
+		return nil, fmt.Errorf("write error: %v", err)
+	}
+	// If the protocol version supports Snappy encoding, upgrade immediately
+	t.conn.SetSnappy(their.Version >= snappyProtocolVersion)
+
+	return their, nil
+}
+
+func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) {
+	msg, err := rw.ReadMsg()
+	if err != nil {
+		return nil, err
+	}
+	if msg.Size > baseProtocolMaxMsgSize {
+		return nil, fmt.Errorf("message too big")
+	}
+	if msg.Code == discMsg {
+		// Disconnect before protocol handshake is valid according to the
+		// spec and we send it ourself if the post-handshake checks fail.
+		// We can't return the reason directly, though, because it is echoed
+		// back otherwise. Wrap it in a string instead.
+		var reason [1]DiscReason
+		rlp.Decode(msg.Payload, &reason)
+		return nil, reason[0]
+	}
+	if msg.Code != handshakeMsg {
+		return nil, fmt.Errorf("expected handshake, got %x", msg.Code)
+	}
+	var hs protoHandshake
+	if err := msg.Decode(&hs); err != nil {
+		return nil, err
+	}
+	if len(hs.ID) != 64 || !bitutil.TestBytes(hs.ID) {
+		return nil, DiscInvalidIdentity
+	}
+	return &hs, nil
+}